You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/01 15:45:29 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 5692bdf6ed [To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)
5692bdf6ed is described below
commit 5692bdf6ed54a928267acbe41b66ed53824126a7
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Sun May 1 23:45:23 2022 +0800
[To rel/0.13][IOTDB-2893] Double live for 0.13 (#5574)
---
.../apache/iotdb/session/SessionCacheLeaderUT.java | 0
.../apache/iotdb/session/util/ThreadUtilsTest.java | 0
server/pom.xml | 5 +
.../resources/conf/iotdb-engine.properties | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 124 ++++++++++++
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 3 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 56 ++++++
.../iotdb/db/doublelive/OperationSyncConsumer.java | 94 +++++++++
.../db/doublelive/OperationSyncDDLProtector.java | 81 ++++++++
.../db/doublelive/OperationSyncDMLProtector.java | 55 ++++++
.../db/doublelive/OperationSyncLogService.java | 219 +++++++++++++++++++++
.../db/doublelive/OperationSyncPlanTypeUtils.java | 53 +++++
.../iotdb/db/doublelive/OperationSyncProducer.java | 54 +++++
.../db/doublelive/OperationSyncProtector.java | 174 ++++++++++++++++
.../db/doublelive/OperationSyncWriteTask.java | 87 ++++++++
.../db/service/thrift/impl/TSServiceImpl.java | 163 ++++++++++++++-
.../doublelive/OperationSyncManualTestUtils.java | 129 ++++++++++++
.../java/org/apache/iotdb/session/Session.java | 19 ++
.../apache/iotdb/session/SessionConnection.java | 20 ++
.../org/apache/iotdb/session/pool/SessionPool.java | 23 +++
thrift/rpc-changelist.md | 13 +-
thrift/src/main/thrift/rpc.thrift | 8 +
22 files changed, 1372 insertions(+), 10 deletions(-)
diff --git a/session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java b/integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
rename to integration/src/test/java/org/apache/iotdb/session/SessionCacheLeaderUT.java
diff --git a/session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java b/integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
similarity index 100%
rename from session/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
rename to integration/src/test/java/org/apache/iotdb/session/util/ThreadUtilsTest.java
diff --git a/server/pom.xml b/server/pom.xml
index 0dfccd31ec..fc9cc02630 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -45,6 +45,11 @@
<artifactId>iotdb-antlr</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-session</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index f8690c3586..bbfbb0be45 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -931,4 +931,4 @@ timestamp_precision=ms
### Group By Fill Configuration
####################
# Datatype: float
-# group_by_fill_cache_size_in_mb=1.0
+# group_by_fill_cache_size_in_mb=1.0
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1bf37275b4..a64066b00d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -820,6 +820,33 @@ public class IoTDBConfig {
/** Encryption provided class parameter */
private String encryptDecryptProviderParameter;
+ // Operation Sync Config
+ private boolean enableOperationSync = false;
+
+ // Secondary IoTDB
+ private String secondaryAddress = "127.0.0.1";
+ private int secondaryPort = 6668;
+ private String secondaryUser = "root";
+ private String secondaryPassword = "root";
+
+ // The transmitting concurrency size of operation sync SessionPool
+ private int OperationSyncSessionConcurrencySize = 8;
+
+ // OperationSyncLog dir
+ private String operationSyncLogDir =
+ DEFAULT_BASE_DIR + File.separator + IoTDBConstant.OPERATION_SYNC_FOLDER_NAME;
+ // The validity of each OperationSyncLog
+ private int operationSyncLogValidity = 30;
+ // The maximum id of OperationSyncLog
+ private int operationSyncLogNum = 32767;
+ // The max size of all the OperationSyncLog. Default is 100GB
+ private long operationSyncMaxLogSize = 107374182400L;
+
+ // OperationSyncProducer DML cache size
+ private int operationSyncProducerCacheSize = 1024;
+ // OperationSyncConsumer concurrency size
+ private int operationSyncConsumerConcurrencySize = 4;
+
public IoTDBConfig() {
// empty constructor
}
@@ -953,6 +980,7 @@ public class IoTDBConfig {
extDir = addHomeDir(extDir);
udfDir = addHomeDir(udfDir);
triggerDir = addHomeDir(triggerDir);
+ operationSyncLogDir = addHomeDir(operationSyncLogDir);
if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
String hdfsDir = getHdfsDir();
@@ -2565,4 +2593,100 @@ public class IoTDBConfig {
public void setEncryptDecryptProviderParameter(String encryptDecryptProviderParameter) {
this.encryptDecryptProviderParameter = encryptDecryptProviderParameter;
}
+
+ public boolean isEnableOperationSync() {
+ return enableOperationSync;
+ }
+
+ public void setEnableOperationSync(boolean enableOperationSync) {
+ this.enableOperationSync = enableOperationSync;
+ }
+
+ public String getSecondaryAddress() {
+ return secondaryAddress;
+ }
+
+ public void setSecondaryAddress(String secondaryAddress) {
+ this.secondaryAddress = secondaryAddress;
+ }
+
+ public int getSecondaryPort() {
+ return secondaryPort;
+ }
+
+ public void setSecondaryPort(int secondaryPort) {
+ this.secondaryPort = secondaryPort;
+ }
+
+ public String getSecondaryUser() {
+ return secondaryUser;
+ }
+
+ public void setSecondaryUser(String secondaryUser) {
+ this.secondaryUser = secondaryUser;
+ }
+
+ public String getSecondaryPassword() {
+ return secondaryPassword;
+ }
+
+ public void setSecondaryPassword(String secondaryPassword) {
+ this.secondaryPassword = secondaryPassword;
+ }
+
+ public int getOperationSyncSessionConcurrencySize() {
+ return OperationSyncSessionConcurrencySize;
+ }
+
+ public void setOperationSyncSessionConcurrencySize(int operationSyncSessionConcurrencySize) {
+ this.OperationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
+ }
+
+ public String getOperationSyncLogDir() {
+ return operationSyncLogDir;
+ }
+
+ public void setOperationSyncLogDir(String operationSyncLogDir) {
+ this.operationSyncLogDir = operationSyncLogDir;
+ }
+
+ public int getOperationSyncLogValidity() {
+ return operationSyncLogValidity;
+ }
+
+ public void setOperationSyncLogValidity(int operationSyncLogValidity) {
+ this.operationSyncLogValidity = operationSyncLogValidity;
+ }
+
+ public int getOperationSyncLogNum() {
+ return operationSyncLogNum;
+ }
+
+ public void setOperationSyncLogNum(int operationSyncLogNum) {
+ this.operationSyncLogNum = operationSyncLogNum;
+ }
+
+ public long getOperationSyncMaxLogSize() {
+ return operationSyncMaxLogSize;
+ }
+
+ public void setOperationSyncMaxLogSize(long operationSyncMaxLogSize) {
+ this.operationSyncMaxLogSize = operationSyncMaxLogSize;
+ }
+
+ public int getOperationSyncProducerCacheSize() {
+ return operationSyncProducerCacheSize;
+ }
+
+ public void setOperationSyncProducerCacheSize(int operationSyncProducerCacheSize) {
+ this.operationSyncProducerCacheSize = operationSyncProducerCacheSize;
+ }
+
+ public int getOperationSyncConsumerConcurrencySize() {
+ return operationSyncConsumerConcurrencySize;
+ }
+
+ public void setOperationSyncConsumerConcurrencySize(int operationSyncConsumerConcurrencySize) {
+ this.operationSyncConsumerConcurrencySize = operationSyncConsumerConcurrencySize;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index feab1abd37..21228f3cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -154,6 +154,9 @@ public class IoTDBConstant {
public static final String UDF_FOLDER_NAME = "udf";
public static final String TRIGGER_FOLDER_NAME = "trigger";
+ // Operation Sync folder name
+ public static final String OPERATION_SYNC_FOLDER_NAME = "operationsync";
+
// mqtt
public static final String ENABLE_MQTT = "enable_mqtt_service";
public static final String MQTT_HOST_NAME = "mqtt_host";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 6f264adaf1..2e86b8ff86 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -790,6 +790,62 @@ public class IoTDBDescriptor {
"iotdb_server_encrypt_decrypt_provider_parameter",
conf.getEncryptDecryptProviderParameter()));
+ // set OperationSync config
+ conf.setEnableOperationSync(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_operation_sync", String.valueOf(conf.isEnableOperationSync()))));
+
+ conf.setSecondaryAddress(
+ properties.getProperty("secondary_address", conf.getSecondaryAddress()));
+
+ conf.setSecondaryPort(
+ Integer.parseInt(
+ properties.getProperty("secondary_port", String.valueOf(conf.getSecondaryPort()))));
+
+ conf.setSecondaryUser(properties.getProperty("secondary_user", conf.getSecondaryUser()));
+
+ conf.setSecondaryPassword(
+ properties.getProperty("secondary_password", conf.getSecondaryPassword()));
+
+ conf.setOperationSyncSessionConcurrencySize(
+ Integer.parseInt(
+ properties.getProperty(
+ "operation_sync_session_concurrency_size",
+ String.valueOf(conf.getOperationSyncSessionConcurrencySize()))));
+
+ conf.setOperationSyncLogDir(
+ properties.getProperty("operation_sync_log_dir", conf.getOperationSyncLogDir()));
+
+ conf.setOperationSyncLogValidity(
+ Integer.parseInt(
+ properties.getProperty(
+ "operation_sync_log_file_validity",
+ String.valueOf(conf.getOperationSyncLogValidity()))));
+
+ conf.setOperationSyncLogNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "operation_sync_log_file_num", String.valueOf(conf.getOperationSyncLogNum()))));
+
+ conf.setOperationSyncMaxLogSize(
+ Long.parseLong(
+ properties.getProperty(
+ "operation_sync_max_log_size",
+ String.valueOf(conf.getOperationSyncMaxLogSize()))));
+
+ conf.setOperationSyncProducerCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "operation_sync_producer_cache_size",
+ String.valueOf(conf.getOperationSyncProducerCacheSize()))));
+
+ conf.setOperationSyncConsumerConcurrencySize(
+ Integer.parseInt(
+ properties.getProperty(
+ "operation_sync_consumer_concurrency_size",
+ String.valueOf(conf.getOperationSyncConsumerConcurrencySize()))));
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
new file mode 100644
index 0000000000..3f05766d53
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+public class OperationSyncConsumer implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
+
+ private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ OperationSyncQueue;
+ private final SessionPool operationSyncSessionPool;
+ private final OperationSyncLogService dmlLogService;
+
+ public OperationSyncConsumer(
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ OperationSyncQueue,
+ SessionPool operationSyncSessionPool,
+ OperationSyncLogService dmlLogService) {
+ this.OperationSyncQueue = OperationSyncQueue;
+ this.operationSyncSessionPool = operationSyncSessionPool;
+ this.dmlLogService = dmlLogService;
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> head;
+ ByteBuffer headBuffer;
+ OperationSyncPlanTypeUtils.OperationSyncPlanType headType;
+ try {
+ head = OperationSyncQueue.take();
+ headBuffer = head.left;
+ headType = head.right;
+ } catch (InterruptedException e) {
+ LOGGER.error("OperationSyncConsumer been interrupted: ", e);
+ continue;
+ }
+
+ headBuffer.position(0);
+ boolean transmitStatus = false;
+ try {
+ headBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and do serialization
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit because network failure", connectionException);
+ } catch (Exception e) {
+ // The PhysicalPlan has internal error, reject transmit
+ LOGGER.error("OperationSyncConsumer can't transmit", e);
+ continue;
+ }
+
+ if (!transmitStatus) {
+ try {
+ // must set buffer position to limit() before serialization
+ headBuffer.position(headBuffer.limit());
+ dmlLogService.acquireLogWriter();
+ dmlLogService.write(headBuffer);
+ } catch (IOException e) {
+ LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e);
+ } finally {
+ dmlLogService.releaseLogWriter();
+ }
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
new file mode 100644
index 0000000000..b2e70c5042
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class OperationSyncDDLProtector extends OperationSyncProtector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);
+
+ private final SessionPool operationSyncSessionPool;
+
+ public OperationSyncDDLProtector(SessionPool operationSyncSessionPool) {
+ super();
+ this.operationSyncSessionPool = operationSyncSessionPool;
+ }
+
+ @Override
+ protected void preCheck() {
+ // do nothing
+ }
+
+ @Override
+ protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+ while (true) {
+ // transmit E-Plan until it's been received
+ boolean transmitStatus = false;
+
+ try {
+ // try operation sync
+ planBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and retry
+ LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+ } catch (Exception e) {
+ // error exception and break
+ LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+ break;
+ }
+
+ if (transmitStatus) {
+ break;
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("OperationSyncDDLProtector is interrupted", e);
+ }
+ }
+ }
+ }
+
+ public boolean isAtWork() {
+ return isProtectorAtWork;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
new file mode 100644
index 0000000000..5668d918f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+public class OperationSyncDMLProtector extends OperationSyncProtector {
+
+ private final OperationSyncDDLProtector ddlProtector;
+ private final OperationSyncProducer producer;
+
+ public OperationSyncDMLProtector(
+ OperationSyncDDLProtector ddlProtector, OperationSyncProducer producer) {
+ super();
+ this.ddlProtector = ddlProtector;
+ this.producer = producer;
+ }
+
+ @Override
+ protected void preCheck() {
+ while (ddlProtector.isAtWork()) {
+ try {
+ TimeUnit.SECONDS.sleep(5);
+ } catch (InterruptedException ignore) {
+ // ignore and retry
+ }
+ }
+ }
+
+ @Override
+ protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
+ producer.put(
+ new Pair<>(planBuffer, OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan)));
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
new file mode 100644
index 0000000000..a11d3fee28
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.writelog.io.LogWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class OperationSyncLogService implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncLogService.class);
+
+ private static final String logFileDir =
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogDir();
+ private static final long logFileValidity =
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity() * 1000L;
+ private static final int maxLogFileNum =
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogNum();
+ private static final long maxLogFileSize =
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncMaxLogSize();
+
+ private static long currentLogFileSize = 0;
+
+ private final OperationSyncProtector protector;
+ private final Lock logWriterLock;
+ private final String logFileName;
+ private int logFileID;
+ private long logFileCreateTime;
+ private File logFile;
+ private LogWriter logWriter;
+
+ public OperationSyncLogService(String logFileName, OperationSyncProtector protector) {
+ this.logFileName = logFileName;
+ this.protector = protector;
+
+ this.logWriterLock = new ReentrantLock();
+ this.logFile = null;
+ this.logWriter = null;
+
+ File logDir = new File(logFileDir);
+ if (!logDir.exists()) {
+ if (!logDir.mkdirs()) {
+ LOGGER.error("Can't make OperationSyncLog file dir: {}", logDir.getAbsolutePath());
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ // Check if there exists remnant logs
+ List<Integer> logFileIDList = new ArrayList<>();
+ for (int ID = 0; ID < maxLogFileNum; ID++) {
+ File file =
+ SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + ID);
+ if (file.exists()) {
+ logFileIDList.add(ID);
+ }
+ }
+
+ int firstID = 0;
+ if (logFileIDList.size() > 0) {
+ // Re-transmit the remnant logs
+ for (int i = 0; i < logFileIDList.size() - 1; i++) {
+ if (logFileIDList.get(i + 1) - logFileIDList.get(i) > 1) {
+ firstID = i + 1;
+ break;
+ }
+ }
+
+ for (int i = firstID; i < logFileIDList.size(); i++) {
+ protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+ }
+ for (int i = 0; i < firstID; i++) {
+ protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+ }
+
+ int nextID;
+ if (firstID == 0) {
+ nextID = logFileIDList.get(logFileIDList.size() - 1) + 1;
+ } else {
+ nextID = logFileIDList.get(firstID - 1) + 1;
+ }
+ logFileID = nextID % maxLogFileNum;
+ } else {
+ logFileID = 0;
+ }
+
+ while (true) {
+ // Check the validity of logFile
+ logWriterLock.lock();
+ try {
+ if (logWriter != null && System.currentTimeMillis() - logFileCreateTime > logFileValidity) {
+ // Submit logFile when it's expired
+ submitLogFile();
+ }
+ } finally {
+ logWriterLock.unlock();
+ }
+
+ try {
+ // Sleep 10s before next check
+ TimeUnit.SECONDS.sleep(10);
+ } catch (InterruptedException e) {
+ LOGGER.error("OperationSyncLogService been interrupted", e);
+ }
+ }
+ }
+
+ private void submitLogFile() {
+ try {
+ logWriter.force();
+ } catch (IOException e) {
+ LOGGER.error("Can't force logWrite", e);
+ }
+ incLogFileSize(logFile.length());
+
+ for (int retry = 0; retry < 5; retry++) {
+ try {
+ logWriter.close();
+ } catch (IOException e) {
+ LOGGER.warn("Can't close OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
+ try {
+ // Sleep 1s and retry
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ // Ignore and retry
+ }
+ continue;
+ }
+
+ LOGGER.info("OperationSyncLog: {} is expired and closed", logFile.getAbsolutePath());
+ break;
+ }
+
+ protector.registerLogFile(
+ logFileDir
+ + File.separator
+ + logFileName
+ + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+
+ logWriter = null;
+ logFile = null;
+ }
+
+ private void createLogFile() {
+ logFile =
+ SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + logFileID);
+ while (true) {
+ try {
+ if (logFile.createNewFile()) {
+ logFileCreateTime = System.currentTimeMillis();
+ logWriter = new LogWriter(logFile, false);
+ LOGGER.info("Create OperationSyncLog: {}", logFile.getAbsolutePath());
+ break;
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Can't create OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException ignored) {
+ // Ignore and retry
+ }
+ }
+ }
+ logFileID = (logFileID + 1) % maxLogFileNum;
+ }
+
+ public static synchronized void incLogFileSize(long size) {
+ currentLogFileSize = currentLogFileSize + size;
+ }
+
+ public void acquireLogWriter() {
+ logWriterLock.lock();
+ }
+
+ public void write(ByteBuffer buffer) throws IOException {
+ if (currentLogFileSize < maxLogFileSize) {
+ if (logWriter == null) {
+ // Create logFile when there are no valid
+ createLogFile();
+ }
+ logWriter.write(buffer);
+ } else {
+ LOGGER.error("The OperationSyncLog is full, new PhysicalPlans will be discarded.");
+ }
+ }
+
+ public void releaseLogWriter() {
+ logWriterLock.unlock();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
new file mode 100644
index 0000000000..a3e21fe5a0
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+
+public class OperationSyncPlanTypeUtils {
+
+ public static OperationSyncPlanType getOperationSyncPlanType(PhysicalPlan plan) {
+ if (plan instanceof SetStorageGroupPlan
+ || plan instanceof DeleteStorageGroupPlan
+ || plan instanceof CreateTimeSeriesPlan
+ || plan instanceof CreateMultiTimeSeriesPlan
+ || plan instanceof CreateAlignedTimeSeriesPlan
+ || plan instanceof DeleteTimeSeriesPlan
+ || plan instanceof AlterTimeSeriesPlan) {
+ return OperationSyncPlanType.DDLPlan;
+ } else if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
+ return OperationSyncPlanType.DMLPlan;
+ }
+ return null;
+ }
+
+ public enum OperationSyncPlanType {
+ DDLPlan, // Create, update and delete schema
+ DMLPlan // insert and delete data
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
new file mode 100644
index 0000000000..2f23b97e05
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.tsfile.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * OperationSyncProducer using BlockingQueue to cache PhysicalPlan. And persist some PhysicalPlan
+ * when they are too many to transmit
+ */
+public class OperationSyncProducer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProducer.class);
+
+ private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ operationSyncQueue;
+
+ public OperationSyncProducer(
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ operationSyncQueue) {
+ this.operationSyncQueue = operationSyncQueue;
+ }
+
+ public void put(Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair) {
+ try {
+ planPair.left.position(0);
+ operationSyncQueue.put(planPair);
+ } catch (InterruptedException e) {
+ LOGGER.error("OperationSync cache failed.", e);
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
new file mode 100644
index 0000000000..a9ff399c3b
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.writelog.io.SingleFileLogReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+public abstract class OperationSyncProtector implements Runnable {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProtector.class);
+ protected static final int logFileValidity =
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity();
+
+ // For transmit log files
+ protected final Lock logFileListLock;
+ protected List<String> registeredLogFiles;
+ protected List<String> processingLogFiles;
+
+ // For serialize PhysicalPlan
+ private static final int MAX_PHYSICALPLAN_SIZE = 16 * 1024 * 1024;
+ protected final ByteArrayOutputStream protectorByteStream;
+ protected final DataOutputStream protectorDeserializeStream;
+
+ // Working state
+ protected volatile boolean isProtectorAtWork;
+
+ protected OperationSyncProtector() {
+ logFileListLock = new ReentrantLock();
+ registeredLogFiles = new ArrayList<>();
+
+ protectorByteStream = new ByteArrayOutputStream(MAX_PHYSICALPLAN_SIZE);
+ protectorDeserializeStream = new DataOutputStream(protectorByteStream);
+
+ isProtectorAtWork = false;
+ }
+
+ protected void registerLogFile(String logFile) {
+ logFileListLock.lock();
+ try {
+ registeredLogFiles.add(logFile);
+ } finally {
+ logFileListLock.unlock();
+ }
+ }
+
+ protected void wrapLogFiles() {
+ processingLogFiles = new ArrayList<>(registeredLogFiles);
+ registeredLogFiles = new ArrayList<>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ while (true) {
+ // Wrap and transmit all OperationSyncLogs
+ logFileListLock.lock();
+ try {
+ if (registeredLogFiles.size() > 0) {
+ isProtectorAtWork = true;
+ wrapLogFiles();
+ } else {
+ isProtectorAtWork = false;
+ break;
+ }
+ } finally {
+ logFileListLock.unlock();
+ }
+ if (isProtectorAtWork) {
+ transmitLogFiles();
+ }
+ }
+
+ try {
+ // Sleep a while before next check
+ TimeUnit.SECONDS.sleep(logFileValidity);
+ } catch (InterruptedException e) {
+ LOGGER.warn("OperationSyncProtector been interrupted", e);
+ }
+ }
+ }
+
+ protected void transmitLogFiles() {
+ preCheck();
+ for (String logFileName : processingLogFiles) {
+ File logFile = SystemFileFactory.INSTANCE.getFile(logFileName);
+ SingleFileLogReader logReader;
+ try {
+ logReader = new SingleFileLogReader(logFile);
+ } catch (FileNotFoundException e) {
+ LOGGER.error(
+ "OperationSyncProtector can't open OperationSyncLog: {}, discarded",
+ logFile.getAbsolutePath(),
+ e);
+ continue;
+ }
+
+ while (logReader.hasNext()) {
+ // read and re-serialize the PhysicalPlan
+ PhysicalPlan nextPlan = logReader.next();
+ try {
+ nextPlan.serialize(protectorDeserializeStream);
+ } catch (IOException e) {
+ LOGGER.error("OperationSyncProtector can't serialize PhysicalPlan", e);
+ continue;
+ }
+ ByteBuffer nextBuffer = ByteBuffer.wrap(protectorByteStream.toByteArray());
+ protectorByteStream.reset();
+ transmitPhysicalPlan(nextBuffer, nextPlan);
+ }
+
+ logReader.close();
+ try {
+ // sleep one second then delete OperationSyncLog
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("OperationSyncProtector is interrupted", e);
+ }
+
+ OperationSyncLogService.incLogFileSize(-logFile.length());
+
+ boolean deleted = false;
+ for (int retryCnt = 0; retryCnt < 5; retryCnt++) {
+ if (logFile.delete()) {
+ deleted = true;
+ LOGGER.info("OperationSyncLog: {} is deleted.", logFile.getAbsolutePath());
+ break;
+ } else {
+ LOGGER.warn("Delete OperationSyncLog: {} failed. Retrying", logFile.getAbsolutePath());
+ }
+ }
+ if (!deleted) {
+ OperationSyncLogService.incLogFileSize(logFile.length());
+ LOGGER.error("Couldn't delete OperationSyncLog: {}", logFile.getAbsolutePath());
+ }
+ }
+ }
+
+ protected abstract void preCheck();
+
+ protected abstract void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
new file mode 100644
index 0000000000..e754be6173
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** OperationSyncWriteTask is used for transmit one E-Plan sending by a client */
+public class OperationSyncWriteTask implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncWriteTask.class);
+
+ private final ByteBuffer physicalPlanBuffer;
+ private final SessionPool operationSyncSessionPool;
+ private final OperationSyncDDLProtector ddlProtector;
+ private final OperationSyncLogService ddlLogService;
+
+ public OperationSyncWriteTask(
+ ByteBuffer physicalPlanBuffer,
+ SessionPool operationSyncSessionPool,
+ OperationSyncDDLProtector ddlProtector,
+ OperationSyncLogService ddlLogService) {
+ this.physicalPlanBuffer = physicalPlanBuffer;
+ this.operationSyncSessionPool = operationSyncSessionPool;
+ this.ddlProtector = ddlProtector;
+ this.ddlLogService = ddlLogService;
+ }
+
+ @Override
+ public void run() {
+ if (ddlProtector.isAtWork()) {
+ serializeEPlan();
+ } else {
+ boolean transmitStatus = false;
+ try {
+ physicalPlanBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(physicalPlanBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and do serialization
+ LOGGER.warn(
+ "OperationSyncWriteTask can't transmit because network failure", connectionException);
+ } catch (Exception e) {
+ // The PhysicalPlan has internal error, reject transmit
+ LOGGER.error("OperationSyncWriteTask can't transmit", e);
+ return;
+ }
+ if (!transmitStatus) {
+ serializeEPlan();
+ }
+ }
+ }
+
+ private void serializeEPlan() {
+ // serialize the E-Plan if necessary
+ try {
+ // must set buffer position to limit() before serialization
+ physicalPlanBuffer.position(physicalPlanBuffer.limit());
+ ddlLogService.acquireLogWriter();
+ ddlLogService.write(physicalPlanBuffer);
+ } catch (IOException e) {
+ LOGGER.error("can't serialize current PhysicalPlan", e);
+ } finally {
+ ddlLogService.releaseLogWriter();
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 3f9dce62b3..a556bc68d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,6 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
+import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -109,6 +116,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -120,6 +128,7 @@ import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.service.rpc.thrift.TSTracingInfo;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -127,11 +136,14 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.SQLException;
@@ -142,6 +154,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -302,9 +316,63 @@ public class TSServiceImpl implements TSIService.Iface {
protected final ServiceProvider serviceProvider;
+ /* OperationSync module */
+ private static final boolean isEnableOperationSync =
+ IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+ private final SessionPool operationSyncsessionPool;
+ private final OperationSyncProducer operationSyncProducer;
+ private final OperationSyncDDLProtector operationSyncDDLProtector;
+ private final OperationSyncLogService operationSyncDDLLogService;
+
public TSServiceImpl() {
super();
serviceProvider = IoTDB.serviceProvider;
+
+ if (isEnableOperationSync) {
+ /* Open OperationSync */
+ IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ // create SessionPool for OperationSync
+ operationSyncsessionPool =
+ new SessionPool(
+ config.getSecondaryAddress(),
+ config.getSecondaryPort(),
+ config.getSecondaryUser(),
+ config.getSecondaryPassword(),
+ 5);
+
+ // create operationSyncDDLProtector and operationSyncDDLLogService
+ operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+ new Thread(operationSyncDDLProtector).start();
+ operationSyncDDLLogService =
+ new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+ new Thread(operationSyncDDLLogService).start();
+
+ // create OperationSyncProducer
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+ operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+ // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ OperationSyncDMLProtector operationSyncDMLProtector =
+ new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+ new Thread(operationSyncDMLProtector).start();
+ OperationSyncLogService operationSyncDMLLogService =
+ new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+ new Thread(operationSyncDMLLogService).start();
+
+ // create OperationSyncConsumer
+ for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+ OperationSyncConsumer consumer =
+ new OperationSyncConsumer(
+ blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
+ new Thread(consumer).start();
+ }
+ } else {
+ operationSyncsessionPool = null;
+ operationSyncProducer = null;
+ operationSyncDDLProtector = null;
+ operationSyncDDLLogService = null;
+ }
}
@Override
@@ -1484,7 +1552,12 @@ public class TSServiceImpl implements TSIService.Iface {
req.values,
req.isAligned);
TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
- return status != null ? status : executeNonQueryPlan(plan);
+
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_RECORD, e.getErrorCode());
} catch (Exception e) {
@@ -1515,7 +1588,12 @@ public class TSServiceImpl implements TSIService.Iface {
plan.setNeedInferType(true);
plan.setAligned(req.isAligned);
TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
- return status != null ? status : executeNonQueryPlan(plan);
+
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(plan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_STRING_RECORD, e.getErrorCode());
} catch (Exception e) {
@@ -1571,7 +1649,11 @@ public class TSServiceImpl implements TSIService.Iface {
insertTabletPlan.setAligned(req.isAligned);
TSStatus status = serviceProvider.checkAuthority(insertTabletPlan, req.getSessionId());
- return status != null ? status : executeNonQueryPlan(insertTabletPlan);
+ if (status != null) {
+ return status;
+ }
+
+ return executeNonQueryPlan(insertTabletPlan);
} catch (IoTDBException e) {
return onIoTDBException(e, OperationType.INSERT_TABLET, e.getErrorCode());
} catch (Exception e) {
@@ -2057,7 +2139,82 @@ public class TSServiceImpl implements TSIService.Iface {
return status != null ? status : executeNonQueryPlan(plan);
}
+ @Override
+ public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+ PhysicalPlan physicalPlan;
+ try {
+ ByteBuffer planBuffer = req.physicalPlan;
+ planBuffer.position(0);
+ physicalPlan = PhysicalPlan.Factory.create(req.physicalPlan);
+ } catch (IllegalPathException | IOException e) {
+ LOGGER.error("OperationSync deserialization failed.", e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
+ }
+
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+ if (planType == null) {
+ LOGGER.error(
+ "OperationSync receive unsupported PhysicalPlan type: {}",
+ physicalPlan.getOperatorName());
+ return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+
+ try {
+ return serviceProvider.executeNonQuery(physicalPlan)
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
+ : RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ } catch (Exception e) {
+ return onNonQueryException(e, OperationType.EXECUTE_NON_QUERY_PLAN);
+ }
+ }
+
+ private void transmitOperationSync(PhysicalPlan physicalPlan) {
+
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
+ if (planType == null) {
+ // Don't need OperationSync
+ return;
+ }
+
+ // serialize physical plan
+ ByteBuffer buffer;
+ try {
+ int size = physicalPlan.getSerializedSize();
+ ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+ DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+ physicalPlan.serialize(operationSyncSerializeStream);
+ buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
+ } catch (IOException e) {
+ LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
+ return;
+ }
+
+ switch (planType) {
+ case DDLPlan:
+ // Create OperationSyncWriteTask and wait
+ OperationSyncWriteTask ddlTask =
+ new OperationSyncWriteTask(
+ buffer,
+ operationSyncsessionPool,
+ operationSyncDDLProtector,
+ operationSyncDDLLogService);
+ ddlTask.run();
+ break;
+ case DMLPlan:
+ // Put into OperationSyncProducer
+ operationSyncProducer.put(new Pair<>(buffer, planType));
+ }
+ }
+
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
+ if (isEnableOperationSync) {
+ // OperationSync should transmit before execute
+ transmitOperationSync(plan);
+ }
+
try {
return serviceProvider.executeNonQuery(plan)
? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully")
diff --git a/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
new file mode 100644
index 0000000000..3b110ebc8b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.doublelive;
+
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This is a manual test utils for DoubleLive. First start two IoTDB that enable OperationSync to
+ * use this.
+ */
+public class OperationSyncManualTestUtils {
+
+ private static final SessionPool sessionPool =
+ new SessionPool("127.0.0.1", 6667, "root", "root", 3);
+
+ private static final String sg = "root.sg";
+ private static final int sgCnt = 10;
+ private static final String d = ".d";
+ private static final int dCnt = 20;
+ private static final String s = ".s";
+ private static final int sCnt = 100;
+ private static final int dataCnt = 1000;
+
+ public void setStorageGroups() throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ sessionPool.setStorageGroup(sg + i);
+ }
+ }
+
+ public void deleteStorageGroups() throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ sessionPool.deleteStorageGroups(Collections.singletonList(sg + i));
+ }
+ }
+
+ public void createTimeSeries() throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.createTimeseries(
+ SG + D + S, TSDataType.INT32, TSEncoding.PLAIN, CompressionType.UNCOMPRESSED);
+ }
+ }
+ }
+ }
+
+ public void deleteTimeSeries() throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.deleteTimeseries(SG + D + S);
+ }
+ }
+ }
+ }
+
+ public void insertData() throws IoTDBConnectionException, StatementExecutionException {
+ long startTime = System.currentTimeMillis();
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ String device = SG + D;
+ List<String> measurements = new ArrayList<>();
+ List<TSDataType> types = new ArrayList<>();
+ for (int k = 0; k < sCnt; k++) {
+ measurements.add("s" + k);
+ types.add(TSDataType.INT32);
+ }
+ for (int l = 0; l < dataCnt; l++) {
+ List<Object> values = new ArrayList<>();
+ for (int k = 0; k < sCnt; k++) {
+ values.add(l);
+ }
+ sessionPool.insertRecord(device, l, measurements, types, values);
+ }
+ }
+ }
+ long endTime = System.currentTimeMillis();
+ System.out.println(
+ "Avg time per insert: "
+ + ((endTime - startTime) / (double) (sgCnt + dCnt + dataCnt))
+ + "ms");
+ }
+
+ public void deleteData() throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < sgCnt; i++) {
+ String SG = sg + i;
+ for (int j = 0; j < dCnt; j++) {
+ String D = d + j;
+ for (int k = 0; k < sCnt; k++) {
+ String S = s + k;
+ sessionPool.deleteData(Collections.singletonList(SG + D + S), 0, dataCnt);
+ }
+ }
+ }
+ }
+}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index b7bf9d9185..cb483c96ab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
@@ -2460,6 +2461,24 @@ public class Session {
}
}
+ /** Transmit insert record request for operation sync */
+ public void operationSyncTransmit(ByteBuffer buffer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ try {
+ TSOperationSyncWriteReq request = genTSExecuteOperationSyncReq(buffer);
+ defaultSessionConnection.executeOperationSync(request);
+ } catch (RedirectException e) {
+ // ignored
+ }
+ }
+
+ private TSOperationSyncWriteReq genTSExecuteOperationSyncReq(ByteBuffer buffer) {
+ TSOperationSyncWriteReq request = new TSOperationSyncWriteReq();
+ request.setOperationSyncType((byte) 0);
+ request.setPhysicalPlan(buffer);
+ return request;
+ }
+
public boolean isEnableQueryRedirection() {
return enableQueryRedirection;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 930f5baaa1..71c1f28c48 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
@@ -942,6 +943,25 @@ public class SessionConnection {
}
}
+ protected void executeOperationSync(TSOperationSyncWriteReq request)
+ throws IoTDBConnectionException, StatementExecutionException, RedirectException {
+ request.setSessionId(sessionId);
+ try {
+ RpcUtils.verifySuccessWithRedirection(client.executeOperationSync(request));
+ } catch (TException e) {
+ if (reconnect()) {
+ try {
+ request.setSessionId(sessionId);
+ RpcUtils.verifySuccess(client.executeOperationSync(request));
+ } catch (TException tException) {
+ throw new IoTDBConnectionException(tException);
+ }
+ } else {
+ throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
+ }
+ }
+ }
+
public boolean isEnableRedirect() {
return enableRedirect;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index 37249b9971..9fa6ff9646 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;
@@ -2257,6 +2258,28 @@ public class SessionPool {
return null;
}
+ /** Transmit insert record request for OperationSync */
+ public boolean operationSyncTransmit(ByteBuffer buffer)
+ throws IoTDBConnectionException, StatementExecutionException {
+ for (int i = 0; i < RETRY; i++) {
+ Session session = getSession();
+ try {
+ buffer.position(0);
+ session.operationSyncTransmit(buffer);
+ putBack(session);
+ return true;
+ } catch (IoTDBConnectionException e) {
+ // TException means the connection is broken, remove it and get a new one.
+ cleanSessionAndMayThrowConnectionException(session, i, e);
+ } catch (StatementExecutionException | RuntimeException e) {
+ putBack(session);
+ throw e;
+ }
+ }
+
+ return false;
+ }
+
public int getMaxSize() {
return maxSize;
}
diff --git a/thrift/rpc-changelist.md b/thrift/rpc-changelist.md
index 43570d8f52..f3095478f1 100644
--- a/thrift/rpc-changelist.md
+++ b/thrift/rpc-changelist.md
@@ -30,14 +30,15 @@ Last Updated on 2022.1.17 by Xin Zhao.
## 2. Add New
-| Latest Changes | Related Committers |
-|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|
-| Add TSTracingInfo | Minghui Liu |
+| Latest Changes | Related Committers |
+|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------|
+| Add TSTracingInfo | Minghui Liu |
| Add structs and interfaces to append, prune, query and unset Schema Template (detail: TSAppendSchemaTemplateReq, TSPruneSchemaTemplateReq, TSQueryTemplateReq, TSQueryTemplateResp, TSUnsetSchemaTemplateReq, appendSchemaTemplate, pruneSchemaTemplate, querySchemaTemplate, unsetSchemaTemplate), and serializedTemplate in TSCreateSchemaTemplateReq | Xin Zhao |
-| Add struct TSInsertStringRecordsOfOneDeviceReq | Hang Zhang |
-| Add method TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req) | Hang Zhang |
-| Add TSDropSchemaTemplateReq, TSStatus dropSchemaTemplate | Xin Zhao |
+| Add struct TSInsertStringRecordsOfOneDeviceReq | Hang Zhang |
+| Add method TSStatus insertStringRecordsOfOneDevice(1:TSInsertStringRecordsOfOneDeviceReq req) | Hang Zhang |
+| Add TSDropSchemaTemplateReq, TSStatus dropSchemaTemplate | Xin Zhao |
| Add TSCreateAlignedTimeseriesReq | Haonan Hou |
+| Add TSOperationSyncWriteReq | Rongzhao Chen |
## 3. Update
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 78ccce6db6..0f699b4684 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -416,6 +416,12 @@ struct TSDropSchemaTemplateReq {
2: required string templateName
}
+struct TSOperationSyncWriteReq {
+ 1: required i64 sessionId
+ 2: required byte operationSyncType
+ 3: required binary physicalPlan
+}
+
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -506,4 +512,6 @@ service TSIService {
TSStatus unsetSchemaTemplate(1:TSUnsetSchemaTemplateReq req);
TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
+
+ TSStatus executeOperationSync(1:TSOperationSyncWriteReq req);
}
\ No newline at end of file