You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 15:45:29 UTC

[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.13 by this push:
     new 5692bdf6ed [To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)
5692bdf6ed is described below

commit 5692bdf6ed54a928267acbe41b66ed53824126a7
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun May 1 23:45:23 2022 +0800

    [To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)
---
 .../apache/iotdb/session/SessionCacheLeaderUT.java |   0
 .../apache/iotdb/session/util/ThreadUtilsTest.java |   0
 server/pom.xml                                     |   5 +
 .../resources/conf/iotdb-engine.properties         |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 124 ++++++++++++
 .../org/apache/iotdb/db/conf/IoTDBConstant.java    |   3 +
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  56 ++++++
 .../iotdb/db/doublelive/OperationSyncConsumer.java |  94 +++++++++
 .../db/doublelive/OperationSyncDDLProtector.java   |  81 ++++++++
 .../db/doublelive/OperationSyncDMLProtector.java   |  55 ++++++
 .../db/doublelive/OperationSyncLogService.java     | 219 +++++++++++++++++++++
 .../db/doublelive/OperationSyncPlanTypeUtils.java  |  53 +++++
 .../iotdb/db/doublelive/OperationSyncProducer.java |  54 +++++
 .../db/doublelive/OperationSyncProtector.java      | 174 ++++++++++++++++
 .../db/doublelive/OperationSyncWriteTask.java      |  87 ++++++++
 .../db/service/thrift/impl/TSServiceImpl.java      | 163 ++++++++++++++-
 .../doublelive/OperationSyncManualTestUtils.java   | 129 ++++++++++++
 .../java/org/apache/iotdb/session/Session.java     |  19 ++
 .../apache/iotdb/session/SessionConnection.java    |  20 ++
 .../org/apache/iotdb/session/pool/SessionPool.java |  23 +++
 thrift/rpc-changelist.md                           |  13 +-
 thrift/src/main/thrift/rpc.thrift                  |   8 +
 22 files changed, 1372 insertions(+), 10 deletions(-)

diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
rename to integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
diff --git a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java b/integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
rename to integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
diff --git a/server/pom.xml b/server/pom.xml
index 0dfccd31ec..fc9cc02630 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -45,6 +45,11 @@
             <artifactId>iotdb-antlr</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iotdb</groupId>
+            <artifactId>iotdb-session</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.iotdb</groupId>
             <artifactId>tsfile</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index f8690c3586..bbfbb0be45 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -931,4 +931,4 @@ timestamp_precision=ms
 ### Group By Fill Configuration
 ####################
 # Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
+# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1bf37275b4..a64066b00d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -820,6 +820,33 @@ public class IoTDBConfig {
   /** Encryption provided class parameter */
   private String encryptDecryptProviderParameter;
 
+  // Operation Sync Config
+  private boolean enableOperationSync = false;
+
+  // Secondary IoTDB
+  private String secondaryAddress = "127.0.0.1";
+  private int secondaryPort = 6668;
+  private String secondaryUser = "root";
+  private String secondaryPassword = "root";
+
+  // The transmitting concurrency size of operation sync SessionPool
+  private int OperationSyncSessionConcurrencySize = 8;
+
+  // OperationSyncLog dir
+  private String operationSyncLogDir =
+      DEFAULT_BASE_DIR + File.separator + IoTDBConstant.OPERATION_SYNC_FOLDER_NAME;
+  // The validity of each OperationSyncLog
+  private int operationSyncLogValidity = 30;
+  // The maximum id of OperationSyncLog
+  private int operationSyncLogNum = 32767;
+  // The max size of all the OperationSyncLog. Default is 100GB
+  private long operationSyncMaxLogSize = 107374182400L;
+
+  // OperationSyncProducer DML cache size
+  private int operationSyncProducerCacheSize = 1024;
+  // OperationSyncConsumer concurrency size
+  private int operationSyncConsumerConcurrencySize = 4;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -953,6 +980,7 @@ public class IoTDBConfig {
     extDir = addHomeDir(extDir);
     udfDir = addHomeDir(udfDir);
     triggerDir = addHomeDir(triggerDir);
+    operationSyncLogDir = addHomeDir(operationSyncLogDir);
 
     if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
       String hdfsDir = getHdfsDir();
@@ -2565,4 +2593,100 @@ public class IoTDBConfig {
   public void setEncryptDecryptProviderParameter(String encryptDecryptProviderParameter) {
     this.encryptDecryptProviderParameter = encryptDecryptProviderParameter;
   }
+
+  public boolean isEnableOperationSync() {
+    return enableOperationSync;
+  }
+
+  public void setEnableOperationSync(boolean enableOperationSync) {
+    this.enableOperationSync = enableOperationSync;
+  }
+
+  public String getSecondaryAddress() {
+    return secondaryAddress;
+  }
+
+  public void setSecondaryAddress(String secondaryAddress) {
+    this.secondaryAddress = secondaryAddress;
+  }
+
+  public int getSecondaryPort() {
+    return secondaryPort;
+  }
+
+  public void setSecondaryPort(int secondaryPort) {
+    this.secondaryPort = secondaryPort;
+  }
+
+  public String getSecondaryUser() {
+    return secondaryUser;
+  }
+
+  public void setSecondaryUser(String secondaryUser) {
+    this.secondaryUser = secondaryUser;
+  }
+
+  public String getSecondaryPassword() {
+    return secondaryPassword;
+  }
+
+  public void setSecondaryPassword(String secondaryPassword) {
+    this.secondaryPassword = secondaryPassword;
+  }
+
+  public int getOperationSyncSessionConcurrencySize() {
+    return OperationSyncSessionConcurrencySize;
+  }
+
+  public void setOperationSyncSessionConcurrencySize(int operationSyncSessionConcurrencySize) {
+    this.OperationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
+  }
+
+  public String getOperationSyncLogDir() {
+    return operationSyncLogDir;
+  }
+
+  public void setOperationSyncLogDir(String operationSyncLogDir) {
+    this.operationSyncLogDir = operationSyncLogDir;
+  }
+
+  public int getOperationSyncLogValidity() {
+    return operationSyncLogValidity;
+  }
+
+  public void setOperationSyncLogValidity(int operationSyncLogValidity) {
+    this.operationSyncLogValidity = operationSyncLogValidity;
+  }
+
+  public int getOperationSyncLogNum() {
+    return operationSyncLogNum;
+  }
+
+  public void setOperationSyncLogNum(int operationSyncLogNum) {
+    this.operationSyncLogNum = operationSyncLogNum;
+  }
+
+  public long getOperationSyncMaxLogSize() {
+    return operationSyncMaxLogSize;
+  }
+
+  public void setOperationSyncMaxLogSize(long operationSyncMaxLogSize) {
+    this.operationSyncMaxLogSize = operationSyncMaxLogSize;
+  }
+
+  public int getOperationSyncProducerCacheSize() {
+    return operationSyncProducerCacheSize;
+  }
+
+  public void setOperationSyncProducerCacheSize(int operationSyncProducerCacheSize) {
+    this.operationSyncProducerCacheSize = operationSyncProducerCacheSize;
+  }
+
+  public int getOperationSyncConsumerConcurrencySize() {
+    return operationSyncConsumerConcurrencySize;
+  }
+
+  public void setOperationSyncConsumerConcurrencySize(int operationSyncConsumerConcurrencySize) {
+    this.operationSyncConsumerConcurrencySize = operationSyncConsumerConcurrencySize;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index feab1abd37..21228f3cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -154,6 +154,9 @@ public class IoTDBConstant {
   public static final String UDF_FOLDER_NAME = "udf";
   public static final String TRIGGER_FOLDER_NAME = "trigger";
 
+  // Operation Sync folder name
+  public static final String OPERATION_SYNC_FOLDER_NAME = "operationsync";
+
   // mqtt
   public static final String ENABLE_MQTT = "enable_mqtt_service";
   public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6f264adaf1..2e86b8ff86 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -790,6 +790,62 @@ public class IoTDBDescriptor {
               "iotdb_server_encrypt_decrypt_provider_parameter",
               conf.getEncryptDecryptProviderParameter()));
 
+      // set OperationSync config
+      conf.setEnableOperationSync(
+          Boolean.parseBoolean(
+              properties.getProperty(
+                  "enable_operation_sync", String.valueOf(conf.isEnableOperationSync()))));
+
+      conf.setSecondaryAddress(
+          properties.getProperty("secondary_address", conf.getSecondaryAddress()));
+
+      conf.setSecondaryPort(
+          Integer.parseInt(
+              properties.getProperty("secondary_port", String.valueOf(conf.getSecondaryPort()))));
+
+      conf.setSecondaryUser(properties.getProperty("secondary_user", conf.getSecondaryUser()));
+
+      conf.setSecondaryPassword(
+          properties.getProperty("secondary_password", conf.getSecondaryPassword()));
+
+      conf.setOperationSyncSessionConcurrencySize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "operation_sync_session_concurrency_size",
+                  String.valueOf(conf.getOperationSyncSessionConcurrencySize()))));
+
+      conf.setOperationSyncLogDir(
+          properties.getProperty("operation_sync_log_dir", conf.getOperationSyncLogDir()));
+
+      conf.setOperationSyncLogValidity(
+          Integer.parseInt(
+              properties.getProperty(
+                  "operation_sync_log_file_validity",
+                  String.valueOf(conf.getOperationSyncLogValidity()))));
+
+      conf.setOperationSyncLogNum(
+          Integer.parseInt(
+              properties.getProperty(
+                  "operation_sync_log_file_num", String.valueOf(conf.getOperationSyncLogNum()))));
+
+      conf.setOperationSyncMaxLogSize(
+          Long.parseLong(
+              properties.getProperty(
+                  "operation_sync_max_log_size",
+                  String.valueOf(conf.getOperationSyncMaxLogSize()))));
+
+      conf.setOperationSyncProducerCacheSize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "operation_sync_producer_cache_size",
+                  String.valueOf(conf.getOperationSyncProducerCacheSize()))));
+
+      conf.setOperationSyncConsumerConcurrencySize(
+          Integer.parseInt(
+              properties.getProperty(
+                  "operation_sync_consumer_concurrency_size",
+                  String.valueOf(conf.getOperationSyncConsumerConcurrencySize()))));
+
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance()
           .getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
new file mode 100644
index 0000000000..3f05766d53
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+public class OperationSyncConsumer implements Runnable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
+
+  private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+      OperationSyncQueue;
+  private final SessionPool operationSyncSessionPool;
+  private final OperationSyncLogService dmlLogService;
+
+  public OperationSyncConsumer(
+      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+          OperationSyncQueue,
+      SessionPool operationSyncSessionPool,
+      OperationSyncLogService dmlLogService) {
+    this.OperationSyncQueue = OperationSyncQueue;
+    this.operationSyncSessionPool = operationSyncSessionPool;
+    this.dmlLogService = dmlLogService;
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> head;
+      ByteBuffer headBuffer;
+      OperationSyncPlanTypeUtils.OperationSyncPlanType headType;
+      try {
+        head = OperationSyncQueue.take();
+        headBuffer = head.left;
+        headType = head.right;
+      } catch (InterruptedException e) {
+        LOGGER.error("OperationSyncConsumer been interrupted: ", e);
+        continue;
+      }
+
+      headBuffer.position(0);
+      boolean transmitStatus = false;
+      try {
+        headBuffer.position(0);
+        transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and do serialization
+        LOGGER.warn(
+            "OperationSyncConsumer can't transmit because network failure", connectionException);
+      } catch (Exception e) {
+        // The PhysicalPlan has internal error, reject transmit
+        LOGGER.error("OperationSyncConsumer can't transmit", e);
+        continue;
+      }
+
+      if (!transmitStatus) {
+        try {
+          // must set buffer position to limit() before serialization
+          headBuffer.position(headBuffer.limit());
+          dmlLogService.acquireLogWriter();
+          dmlLogService.write(headBuffer);
+        } catch (IOException e) {
+          LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e);
+        } finally {
+          dmlLogService.releaseLogWriter();
+        }
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
new file mode 100644
index 0000000000..b2e70c5042
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class OperationSyncDDLProtector extends OperationSyncProtector {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);
+
+  private final SessionPool operationSyncSessionPool;
+
+  public OperationSyncDDLProtector(SessionPool operationSyncSessionPool) {
+    super();
+    this.operationSyncSessionPool = operationSyncSessionPool;
+  }
+
+  @Override
+  protected void preCheck() {
+    // do nothing
+  }
+
+  @Override
+  protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+    while (true) {
+      // transmit E-Plan until it's been received
+      boolean transmitStatus = false;
+
+      try {
+        // try operation sync
+        planBuffer.position(0);
+        transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and retry
+        LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+      } catch (Exception e) {
+        // error exception and break
+        LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+        break;
+      }
+
+      if (transmitStatus) {
+        break;
+      } else {
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException e) {
+          LOGGER.warn("OperationSyncDDLProtector is interrupted", e);
+        }
+      }
+    }
+  }
+
+  public boolean isAtWork() {
+    return isProtectorAtWork;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
new file mode 100644
index 0000000000..5668d918f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class OperationSyncDMLProtector extends OperationSyncProtector {
+
+  private final OperationSyncDDLProtector ddlProtector;
+  private final OperationSyncProducer producer;
+
+  public OperationSyncDMLProtector(
+      OperationSyncDDLProtector ddlProtector, OperationSyncProducer producer) {
+    super();
+    this.ddlProtector = ddlProtector;
+    this.producer = producer;
+  }
+
+  @Override
+  protected void preCheck() {
+    while (ddlProtector.isAtWork()) {
+      try {
+        TimeUnit.SECONDS.sleep(5);
+      } catch (InterruptedException ignore) {
+        // ignore and retry
+      }
+    }
+  }
+
+  @Override
+  protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+    producer.put(
+        new Pair<>(planBuffer, OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan)));
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
new file mode 100644
index 0000000000..a11d3fee28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class OperationSyncLogService implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncLogService.class);
+
+  private static final String logFileDir =
+      IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogDir();
+  private static final long logFileValidity =
+      IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity() * 1000L;
+  private static final int maxLogFileNum =
+      IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogNum();
+  private static final long maxLogFileSize =
+      IoTDBDescriptor.getInstance().getConfig().getOperationSyncMaxLogSize();
+
+  private static long currentLogFileSize = 0;
+
+  private final OperationSyncProtector protector;
+  private final Lock logWriterLock;
+  private final String logFileName;
+  private int logFileID;
+  private long logFileCreateTime;
+  private File logFile;
+  private LogWriter logWriter;
+
+  public OperationSyncLogService(String logFileName, OperationSyncProtector protector) {
+    this.logFileName = logFileName;
+    this.protector = protector;
+
+    this.logWriterLock = new ReentrantLock();
+    this.logFile = null;
+    this.logWriter = null;
+
+    File logDir = new File(logFileDir);
+    if (!logDir.exists()) {
+      if (!logDir.mkdirs()) {
+        LOGGER.error("Can't make OperationSyncLog file dir: {}", logDir.getAbsolutePath());
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    // Check if there exists remnant logs
+    List<Integer> logFileIDList = new ArrayList<>();
+    for (int ID = 0; ID < maxLogFileNum; ID++) {
+      File file =
+          SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + ID);
+      if (file.exists()) {
+        logFileIDList.add(ID);
+      }
+    }
+
+    int firstID = 0;
+    if (logFileIDList.size() > 0) {
+      // Re-transmit the remnant logs
+      for (int i = 0; i < logFileIDList.size() - 1; i++) {
+        if (logFileIDList.get(i + 1) - logFileIDList.get(i) > 1) {
+          firstID = i + 1;
+          break;
+        }
+      }
+
+      for (int i = firstID; i < logFileIDList.size(); i++) {
+        protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+      }
+      for (int i = 0; i < firstID; i++) {
+        protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+      }
+
+      int nextID;
+      if (firstID == 0) {
+        nextID = logFileIDList.get(logFileIDList.size() - 1) + 1;
+      } else {
+        nextID = logFileIDList.get(firstID - 1) + 1;
+      }
+      logFileID = nextID % maxLogFileNum;
+    } else {
+      logFileID = 0;
+    }
+
+    while (true) {
+      // Check the validity of logFile
+      logWriterLock.lock();
+      try {
+        if (logWriter != null && System.currentTimeMillis() - logFileCreateTime > logFileValidity) {
+          // Submit logFile when it's expired
+          submitLogFile();
+        }
+      } finally {
+        logWriterLock.unlock();
+      }
+
+      try {
+        // Sleep 10s before next check
+        TimeUnit.SECONDS.sleep(10);
+      } catch (InterruptedException e) {
+        LOGGER.error("OperationSyncLogService been interrupted", e);
+      }
+    }
+  }
+
+  private void submitLogFile() {
+    try {
+      logWriter.force();
+    } catch (IOException e) {
+      LOGGER.error("Can't force logWrite", e);
+    }
+    incLogFileSize(logFile.length());
+
+    for (int retry = 0; retry < 5; retry++) {
+      try {
+        logWriter.close();
+      } catch (IOException e) {
+        LOGGER.warn("Can't close OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
+        try {
+          // Sleep 1s and retry
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException ignored) {
+          // Ignore and retry
+        }
+        continue;
+      }
+
+      LOGGER.info("OperationSyncLog: {} is expired and closed", logFile.getAbsolutePath());
+      break;
+    }
+
+    protector.registerLogFile(
+        logFileDir
+            + File.separator
+            + logFileName
+            + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+
+    logWriter = null;
+    logFile = null;
+  }
+
+  private void createLogFile() {
+    logFile =
+        SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + logFileID);
+    while (true) {
+      try {
+        if (logFile.createNewFile()) {
+          logFileCreateTime = System.currentTimeMillis();
+          logWriter = new LogWriter(logFile, false);
+          LOGGER.info("Create OperationSyncLog: {}", logFile.getAbsolutePath());
+          break;
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Can't create OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
+        try {
+          TimeUnit.SECONDS.sleep(1);
+        } catch (InterruptedException ignored) {
+          // Ignore and retry
+        }
+      }
+    }
+    logFileID = (logFileID + 1) % maxLogFileNum;
+  }
+
+  public static synchronized void incLogFileSize(long size) {
+    currentLogFileSize = currentLogFileSize + size;
+  }
+
+  public void acquireLogWriter() {
+    logWriterLock.lock();
+  }
+
+  public void write(ByteBuffer buffer) throws IOException {
+    if (currentLogFileSize < maxLogFileSize) {
+      if (logWriter == null) {
+        // Create logFile when there are no valid
+        createLogFile();
+      }
+      logWriter.write(buffer);
+    } else {
+      LOGGER.error("The OperationSyncLog is full, new PhysicalPlans will be discarded.");
+    }
+  }
+
+  public void releaseLogWriter() {
+    logWriterLock.unlock();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
new file mode 100644
index 0000000000..a3e21fe5a0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+
+public class OperationSyncPlanTypeUtils {
+
+  public static OperationSyncPlanType getOperationSyncPlanType(PhysicalPlan plan) {
+    if (plan instanceof SetStorageGroupPlan
+        || plan instanceof DeleteStorageGroupPlan
+        || plan instanceof CreateTimeSeriesPlan
+        || plan instanceof CreateMultiTimeSeriesPlan
+        || plan instanceof CreateAlignedTimeSeriesPlan
+        || plan instanceof DeleteTimeSeriesPlan
+        || plan instanceof AlterTimeSeriesPlan) {
+      return OperationSyncPlanType.DDLPlan;
+    } else if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
+      return OperationSyncPlanType.DMLPlan;
+    }
+    return null;
+  }
+
+  public enum OperationSyncPlanType {
+    DDLPlan, // Create, update and delete schema
+    DMLPlan // insert and delete data
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
new file mode 100644
index 0000000000..2f23b97e05
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * OperationSyncProducer using BlockingQueue to cache PhysicalPlan. And persist some PhysicalPlan
+ * when they are too many to transmit
+ */
+public class OperationSyncProducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProducer.class);
+
+  private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+      operationSyncQueue;
+
+  public OperationSyncProducer(
+      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+          operationSyncQueue) {
+    this.operationSyncQueue = operationSyncQueue;
+  }
+
+  public void put(Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair) {
+    try {
+      planPair.left.position(0);
+      operationSyncQueue.put(planPair);
+    } catch (InterruptedException e) {
+      LOGGER.error("OperationSync cache failed.", e);
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
new file mode 100644
index 0000000000..a9ff399c3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.writelog.io.SingleFileLogReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class OperationSyncProtector implements Runnable {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProtector.class);
+  protected static final int logFileValidity =
+      IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity();
+
+  // For transmit log files
+  protected final Lock logFileListLock;
+  protected List<String> registeredLogFiles;
+  protected List<String> processingLogFiles;
+
+  // For serialize PhysicalPlan
+  private static final int MAX_PHYSICALPLAN_SIZE = 16 * 1024 * 1024;
+  protected final ByteArrayOutputStream protectorByteStream;
+  protected final DataOutputStream protectorDeserializeStream;
+
+  // Working state
+  protected volatile boolean isProtectorAtWork;
+
+  protected OperationSyncProtector() {
+    logFileListLock = new ReentrantLock();
+    registeredLogFiles = new ArrayList<>();
+
+    protectorByteStream = new ByteArrayOutputStream(MAX_PHYSICALPLAN_SIZE);
+    protectorDeserializeStream = new DataOutputStream(protectorByteStream);
+
+    isProtectorAtWork = false;
+  }
+
+  protected void registerLogFile(String logFile) {
+    logFileListLock.lock();
+    try {
+      registeredLogFiles.add(logFile);
+    } finally {
+      logFileListLock.unlock();
+    }
+  }
+
+  protected void wrapLogFiles() {
+    processingLogFiles = new ArrayList<>(registeredLogFiles);
+    registeredLogFiles = new ArrayList<>();
+  }
+
+  @Override
+  public void run() {
+    while (true) {
+      while (true) {
+        // Wrap and transmit all OperationSyncLogs
+        logFileListLock.lock();
+        try {
+          if (registeredLogFiles.size() > 0) {
+            isProtectorAtWork = true;
+            wrapLogFiles();
+          } else {
+            isProtectorAtWork = false;
+            break;
+          }
+        } finally {
+          logFileListLock.unlock();
+        }
+        if (isProtectorAtWork) {
+          transmitLogFiles();
+        }
+      }
+
+      try {
+        // Sleep a while before next check
+        TimeUnit.SECONDS.sleep(logFileValidity);
+      } catch (InterruptedException e) {
+        LOGGER.warn("OperationSyncProtector been interrupted", e);
+      }
+    }
+  }
+
+  protected void transmitLogFiles() {
+    preCheck();
+    for (String logFileName : processingLogFiles) {
+      File logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+      SingleFileLogReader logReader;
+      try {
+        logReader = new SingleFileLogReader(logFile);
+      } catch (FileNotFoundException e) {
+        LOGGER.error(
+            "OperationSyncProtector can't open OperationSyncLog: {}, discarded",
+            logFile.getAbsolutePath(),
+            e);
+        continue;
+      }
+
+      while (logReader.hasNext()) {
+        // read and re-serialize the PhysicalPlan
+        PhysicalPlan nextPlan = logReader.next();
+        try {
+          nextPlan.serialize(protectorDeserializeStream);
+        } catch (IOException e) {
+          LOGGER.error("OperationSyncProtector can't serialize PhysicalPlan", e);
+          continue;
+        }
+        ByteBuffer nextBuffer = ByteBuffer.wrap(protectorByteStream.toByteArray());
+        protectorByteStream.reset();
+        transmitPhysicalPlan(nextBuffer, nextPlan);
+      }
+
+      logReader.close();
+      try {
+        // sleep one second then delete OperationSyncLog
+        TimeUnit.SECONDS.sleep(1);
+      } catch (InterruptedException e) {
+        LOGGER.warn("OperationSyncProtector is interrupted", e);
+      }
+
+      OperationSyncLogService.incLogFileSize(-logFile.length());
+
+      boolean deleted = false;
+      for (int retryCnt = 0; retryCnt < 5; retryCnt++) {
+        if (logFile.delete()) {
+          deleted = true;
+          LOGGER.info("OperationSyncLog: {} is deleted.", logFile.getAbsolutePath());
+          break;
+        } else {
+          LOGGER.warn("Delete OperationSyncLog: {} failed. Retrying", logFile.getAbsolutePath());
+        }
+      }
+      if (!deleted) {
+        OperationSyncLogService.incLogFileSize(logFile.length());
+        LOGGER.error("Couldn't delete OperationSyncLog: {}", logFile.getAbsolutePath());
+      }
+    }
+  }
+
+  protected abstract void preCheck();
+
+  protected abstract void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
new file mode 100644
index 0000000000..e754be6173
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** OperationSyncWriteTask is used for transmit one E-Plan sending by a client */
+public class OperationSyncWriteTask implements Runnable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncWriteTask.class);
+
+  private final ByteBuffer physicalPlanBuffer;
+  private final SessionPool operationSyncSessionPool;
+  private final OperationSyncDDLProtector ddlProtector;
+  private final OperationSyncLogService ddlLogService;
+
+  public OperationSyncWriteTask(
+      ByteBuffer physicalPlanBuffer,
+      SessionPool operationSyncSessionPool,
+      OperationSyncDDLProtector ddlProtector,
+      OperationSyncLogService ddlLogService) {
+    this.physicalPlanBuffer = physicalPlanBuffer;
+    this.operationSyncSessionPool = operationSyncSessionPool;
+    this.ddlProtector = ddlProtector;
+    this.ddlLogService = ddlLogService;
+  }
+
+  @Override
+  public void run() {
+    if (ddlProtector.isAtWork()) {
+      serializeEPlan();
+    } else {
+      boolean transmitStatus = false;
+      try {
+        physicalPlanBuffer.position(0);
+        transmitStatus = operationSyncSessionPool.operationSyncTransmit(physicalPlanBuffer);
+      } catch (IoTDBConnectionException connectionException) {
+        // warn IoTDBConnectionException and do serialization
+        LOGGER.warn(
+            "OperationSyncWriteTask can't transmit because network failure", connectionException);
+      } catch (Exception e) {
+        // The PhysicalPlan has internal error, reject transmit
+        LOGGER.error("OperationSyncWriteTask can't transmit", e);
+        return;
+      }
+      if (!transmitStatus) {
+        serializeEPlan();
+      }
+    }
+  }
+
+  private void serializeEPlan() {
+    // serialize the E-Plan if necessary
+    try {
+      // must set buffer position to limit() before serialization
+      physicalPlanBuffer.position(physicalPlanBuffer.limit());
+      ddlLogService.acquireLogWriter();
+      ddlLogService.write(physicalPlanBuffer);
+    } catch (IOException e) {
+      LOGGER.error("can't serialize current PhysicalPlan", e);
+    } finally {
+      ddlLogService.releaseLogWriter();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3f9dce62b3..a556bc68d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,6 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
+import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
 import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
 import org.apache.iotdb.db.exception.IoTDBException;
 import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -109,6 +116,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
 import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -120,6 +128,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
 import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -127,11 +136,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.sql.SQLException;
@@ -142,6 +154,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -302,9 +316,63 @@ public class TSServiceImpl implements TSIService.Iface {
 
   protected final ServiceProvider serviceProvider;
 
+  /* OperationSync module */
+  private static final boolean isEnableOperationSync =
+      IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+  private final SessionPool operationSyncsessionPool;
+  private final OperationSyncProducer operationSyncProducer;
+  private final OperationSyncDDLProtector operationSyncDDLProtector;
+  private final OperationSyncLogService operationSyncDDLLogService;
+
   public TSServiceImpl() {
     super();
     serviceProvider = IoTDB.serviceProvider;
+
+    if (isEnableOperationSync) {
+      /* Open OperationSync */
+      IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+      // create SessionPool for OperationSync
+      operationSyncsessionPool =
+          new SessionPool(
+              config.getSecondaryAddress(),
+              config.getSecondaryPort(),
+              config.getSecondaryUser(),
+              config.getSecondaryPassword(),
+              5);
+
+      // create operationSyncDDLProtector and operationSyncDDLLogService
+      operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+      new Thread(operationSyncDDLProtector).start();
+      operationSyncDDLLogService =
+          new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+      new Thread(operationSyncDDLLogService).start();
+
+      // create OperationSyncProducer
+      BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+          blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+      operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+      // create OperationSyncDMLProtector and OperationSyncDMLLogService
+      OperationSyncDMLProtector operationSyncDMLProtector =
+          new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+      new Thread(operationSyncDMLProtector).start();
+      OperationSyncLogService operationSyncDMLLogService =
+          new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+      new Thread(operationSyncDMLLogService).start();
+
+      // create OperationSyncConsumer
+      for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+        OperationSyncConsumer consumer =
+            new OperationSyncConsumer(
+                blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+        new Thread(consumer).start();
+      }
+    } else {
+      operationSyncsessionPool = null;
+      operationSyncProducer = null;
+      operationSyncDDLProtector = null;
+      operationSyncDDLLogService = null;
+    }
   }
 
   @Override
@@ -1484,7 +1552,12 @@ public class TSServiceImpl implements TSIService.Iface {
               req.values,
               req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
-      return status != null ? status : executeNonQueryPlan(plan);
+
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
     } catch (Exception e) {
@@ -1515,7 +1588,12 @@ public class TSServiceImpl implements TSIService.Iface {
       plan.setNeedInferType(true);
       plan.setAligned(req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
-      return status != null ? status : executeNonQueryPlan(plan);
+
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(plan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
     } catch (Exception e) {
@@ -1571,7 +1649,11 @@ public class TSServiceImpl implements TSIService.Iface {
       insertTabletPlan.setAligned(req.isAligned);
       TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
 
-      return status != null ? status : executeNonQueryPlan(insertTabletPlan);
+      if (status != null) {
+        return status;
+      }
+
+      return executeNonQueryPlan(insertTabletPlan);
     } catch (IoTDBException e) {
       return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode());
     } catch (Exception e) {
@@ -2057,7 +2139,82 @@ public class TSServiceImpl implements TSIService.Iface {
     return status != null ? status : executeNonQueryPlan(plan);
   }
 
+  @Override
+  public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+    PhysicalPlan physicalPlan;
+    try {
+      ByteBuffer planBuffer = req.physicalPlan;
+      planBuffer.position(0);
+      physicalPlan = PhysicalPlan.Factory.create(req.physicalPlan);
+    } catch (IllegalPathException | IOException e) {
+      LOGGER.error("OperationSync deserialization failed.", e);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+          .setMessage(e.getMessage());
+    }
+
+    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+    if (planType == null) {
+      LOGGER.error(
+          "OperationSync receive unsupported PhysicalPlan type: {}",
+          physicalPlan.getOperatorName());
+      return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    }
+
+    try {
+      return serviceProvider.executeNonQuery(physicalPlan)
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+          : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+    } catch (Exception e) {
+      return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
+    }
+  }
+
+  private void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+    OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+        OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+    if (planType == null) {
+      // Don't need OperationSync
+      return;
+    }
+
+    // serialize physical plan
+    ByteBuffer buffer;
+    try {
+      int size = physicalPlan.getSerializedSize();
+      ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+      DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+      physicalPlan.serialize(operationSyncSerializeStream);
+      buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+    } catch (IOException e) {
+      LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
+      return;
+    }
+
+    switch (planType) {
+      case DDLPlan:
+        // Create OperationSyncWriteTask and wait
+        OperationSyncWriteTask ddlTask =
+            new OperationSyncWriteTask(
+                buffer,
+                operationSyncsessionPool,
+                operationSyncDDLProtector,
+                operationSyncDDLLogService);
+        ddlTask.run();
+        break;
+      case DMLPlan:
+        // Put into OperationSyncProducer
+        operationSyncProducer.put(new Pair<>(buffer, planType));
+    }
+  }
+
   protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+    if (isEnableOperationSync) {
+      // OperationSync should transmit before execute
+      transmitOperationSync(plan);
+    }
+
     try {
       return serviceProvider.executeNonQuery(plan)
           ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
diff --git a/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
new file mode 100644
index 0000000000..3b110ebc8b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is a manual test utils for DoubleLive. First start two IoTDB that enable OperationSync to
+ * use this.
+ */
+public class OperationSyncManualTestUtils {
+
+  private static final SessionPool sessionPool =
+      new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+
+  private static final String sg = "root.sg";
+  private static final int sgCnt = 10;
+  private static final String d = ".d";
+  private static final int dCnt = 20;
+  private static final String s = ".s";
+  private static final int sCnt = 100;
+  private static final int dataCnt = 1000;
+
+  public void setStorageGroups() throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      sessionPool.setStorageGroup(sg + i);
+    }
+  }
+
+  public void deleteStorageGroups() throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      sessionPool.deleteStorageGroups(Collections.singletonList(sg + i));
+    }
+  }
+
+  public void createTimeSeries() throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.createTimeseries(
+              SG + D + S, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED);
+        }
+      }
+    }
+  }
+
+  public void deleteTimeSeries() throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.deleteTimeseries(SG + D + S);
+        }
+      }
+    }
+  }
+
+  public void insertData() throws IoTDBConnectionException, StatementExecutionException {
+    long startTime = System.currentTimeMillis();
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        String device = SG + D;
+        List<String> measurements = new ArrayList<>();
+        List<TSDataType> types = new ArrayList<>();
+        for (int k = 0; k < sCnt; k++) {
+          measurements.add("s" + k);
+          types.add(TSDataType.INT32);
+        }
+        for (int l = 0; l < dataCnt; l++) {
+          List<Object> values = new ArrayList<>();
+          for (int k = 0; k < sCnt; k++) {
+            values.add(l);
+          }
+          sessionPool.insertRecord(device, l, measurements, types, values);
+        }
+      }
+    }
+    long endTime = System.currentTimeMillis();
+    System.out.println(
+        "Avg time per insert: "
+            + ((endTime - startTime) / (double) (sgCnt + dCnt + dataCnt))
+            + "ms");
+  }
+
+  public void deleteData() throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < sgCnt; i++) {
+      String SG = sg + i;
+      for (int j = 0; j < dCnt; j++) {
+        String D = d + j;
+        for (int k = 0; k < sCnt; k++) {
+          String S = s + k;
+          sessionPool.deleteData(Collections.singletonList(SG + D + S), 0, dataCnt);
+        }
+      }
+    }
+  }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b7bf9d9185..cb483c96ab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
 import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
@@ -2460,6 +2461,24 @@ public class Session {
     }
   }
 
+  /** Transmit insert record request for operation sync */
+  public void operationSyncTransmit(ByteBuffer buffer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    try {
+      TSOperationSyncWriteReq request = genTSExecuteOperationSyncReq(buffer);
+      defaultSessionConnection.executeOperationSync(request);
+    } catch (RedirectException e) {
+      // ignored
+    }
+  }
+
+  private TSOperationSyncWriteReq genTSExecuteOperationSyncReq(ByteBuffer buffer) {
+    TSOperationSyncWriteReq request = new TSOperationSyncWriteReq();
+    request.setOperationSyncType((byte) 0);
+    request.setPhysicalPlan(buffer);
+    return request;
+  }
+
   public boolean isEnableQueryRedirection() {
     return enableQueryRedirection;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 930f5baaa1..71c1f28c48 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
 import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
 import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
@@ -942,6 +943,25 @@ public class SessionConnection {
     }
   }
 
+  protected void executeOperationSync(TSOperationSyncWriteReq request)
+      throws IoTDBConnectionException, StatementExecutionException, RedirectException {
+    request.setSessionId(sessionId);
+    try {
+      RpcUtils.verifySuccessWithRedirection(client.executeOperationSync(request));
+    } catch (TException e) {
+      if (reconnect()) {
+        try {
+          request.setSessionId(sessionId);
+          RpcUtils.verifySuccess(client.executeOperationSync(request));
+        } catch (TException tException) {
+          throw new IoTDBConnectionException(tException);
+        }
+      } else {
+        throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+      }
+    }
+  }
+
   public boolean isEnableRedirect() {
     return enableRedirect;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 37249b9971..9fa6ff9646 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
@@ -2257,6 +2258,28 @@ public class SessionPool {
     return null;
   }
 
+  /** Transmit insert record request for OperationSync */
+  public boolean operationSyncTransmit(ByteBuffer buffer)
+      throws IoTDBConnectionException, StatementExecutionException {
+    for (int i = 0; i < RETRY; i++) {
+      Session session = getSession();
+      try {
+        buffer.position(0);
+        session.operationSyncTransmit(buffer);
+        putBack(session);
+        return true;
+      } catch (IoTDBConnectionException e) {
+        // TException means the connection is broken, remove it and get a new one.
+        cleanSessionAndMayThrowConnectionException(session, i, e);
+      } catch (StatementExecutionException | RuntimeException e) {
+        putBack(session);
+        throw e;
+      }
+    }
+
+    return false;
+  }
+
   public int getMaxSize() {
     return maxSize;
   }
diff --git a/thrift/rpc-changelist.md b/thrift/rpc-changelist.md
index 43570d8f52..f3095478f1 100644
--- a/thrift/rpc-changelist.md
+++ b/thrift/rpc-changelist.md
@@ -30,14 +30,15 @@ Last Updated on 2022.1.17 by Xin Zhao.
 
 ## 2. Add New
 
-| Latest Changes                                                                                                                                                                                                                                                                                                                                        | Related Committers |
-|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|
-| Add TSTracingInfo                                                                                                                                                                                                                                                                                                                                     | Minghui Liu        |
+| Latest Changes                                                                                                                                                                                                                                                                                                                                          | Related Committers |
+|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|
+| Add TSTracingInfo                                                                                                                                                                                                                                                                                                                                       | Minghui Liu        |
 | Add structs and interfaces to append, prune, query and unset Schema Template (detail: TSAppendSchemaTemplateReq, TSPruneSchemaTemplateReq, TSQueryTemplateReq, TSQueryTemplateResp, TSUnsetSchemaTemplateReq, appendSchemaTemplate, pruneSchemaTemplate, querySchemaTemplate, unsetSchemaTemplate), and serializedTemplate in TSCreateSchemaTemplateReq | Xin Zhao           |
-| Add struct TSInsertStringRecordsOfOneDeviceReq                                                                                                                                                                                                                                                                                                        | Hang Zhang         |
-| Add method TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req)                                                                                                                                                                                                                                                         | Hang Zhang         |
-| Add TSDropSchemaTemplateReq, TSStatus dropSchemaTemplate                                                                                                                                                                                                                                                                                              | Xin Zhao           |
+| Add struct TSInsertStringRecordsOfOneDeviceReq                                                                                                                                                                                                                                                                                                          | Hang Zhang         |
+| Add method TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req)                                                                                                                                                                                                                                                           | Hang Zhang         |
+| Add TSDropSchemaTemplateReq, TSStatus dropSchemaTemplate                                                                                                                                                                                                                                                                                                | Xin Zhao           |
 | Add TSCreateAlignedTimeseriesReq                                                                                                                                                                                                                                                                                                                        | Haonan Hou         |
+| Add TSOperationSyncWriteReq                                                                                                                                                                                                                                                                                                                             | Rongzhao Chen      |
 
 ## 3. Update
 
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 78ccce6db6..0f699b4684 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -416,6 +416,12 @@ struct TSDropSchemaTemplateReq {
   2: required string templateName
 }
 
+struct TSOperationSyncWriteReq {
+  1: required i64 sessionId
+  2: required byte operationSyncType
+  3: required binary physicalPlan
+}
+
 service TSIService {
   TSOpenSessionResp openSession(1:TSOpenSessionReq req);
 
@@ -506,4 +512,6 @@ service TSIService {
   TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
 
   TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+  TSStatus executeOperationSync(1:TSOperationSyncWriteReq req);
 }
\ No newline at end of file