You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/14 03:02:04 UTC
[iotdb] branch double_live updated: [To double_live] Support alter timesereis (#5506)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch double_live
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/double_live by this push:
new 60487b6925 [To double_live] Support alter timesereis (#5506)
60487b6925 is described below
commit 60487b69252f31cf1182feec60193e194030521e
Author: YongzaoDan <33...@users.noreply.github.com>
AuthorDate: Thu Apr 14 11:01:58 2022 +0800
[To double_live] Support alter timesereis (#5506)
---
example/{doublewrite => operationsync}/pom.xml | 4 +-
.../OperationSyncDegradationRate.java} | 27 ++--
.../operationsync/OperationSyncDurability.java} | 18 +--
.../iotdb/operationsync/OperationSyncEmpty.java} | 4 +-
.../iotdb/operationsync/OperationSyncExample.java} | 17 +--
.../iotdb/operationsync/OperationSyncThread.java} | 8 +-
.../iotdb/operationsync/OperationSyncUtil.java} | 34 ++---
example/pom.xml | 2 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 100 +++++++-------
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 4 +-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 43 +++---
.../OperationSyncConsumer.java} | 54 ++++----
.../OperationSyncDDLProtector.java} | 22 +--
.../OperationSyncDMLProtector.java} | 17 +--
.../OperationSyncLogService.java} | 30 ++---
.../OperationSyncPlanTypeUtils.java} | 18 +--
.../OperationSyncProducer.java} | 28 ++--
.../OperationSyncProtector.java} | 32 ++---
.../OperationSyncWriteTask.java} | 31 ++---
.../db/service/thrift/impl/TSServiceImpl.java | 149 +++++++++++----------
.../OperationSyncManualTestUtils.java} | 4 +-
.../java/org/apache/iotdb/session/Session.java | 16 +--
.../apache/iotdb/session/SessionConnection.java | 8 +-
.../org/apache/iotdb/session/pool/SessionPool.java | 6 +-
thrift/src/main/thrift/rpc.thrift | 6 +-
25 files changed, 346 insertions(+), 336 deletions(-)
diff --git a/example/doublewrite/pom.xml b/example/operationsync/pom.xml
similarity index 96%
rename from example/doublewrite/pom.xml
rename to example/operationsync/pom.xml
index 238f43224f..70c48c6b33 100644
--- a/example/doublewrite/pom.xml
+++ b/example/operationsync/pom.xml
@@ -27,8 +27,8 @@
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>doublewrite-example</artifactId>
- <name>double-write-example</name>
+ <artifactId>operationsync-example</artifactId>
+ <name>operation-sync-example</name>
<dependencies>
<dependency>
<groupId>org.apache.iotdb</groupId>
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDegradationRate.java
similarity index 67%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDegradationRate.java
index df3e195cff..a23722de1d 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDegradationRate.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDegradationRate.java
@@ -16,23 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
/**
- * This is a double write insertion degradation rate test java class, which shows the performance
- * impact when enable double write feature. You can run this code in the same way as
- * DoubleWriteExample.java. Since IoTDB-A enable the double write feature, the performance impact is
- * correct only when Both IoTDB-A and IoTDB-B run on the same computer. Or you can modify the
- * default configuration of IoTDB-A and IoTDB-B after becoming familiar with DoubleWriteExample,
- * DoubleWriteUtil to get A more accurate performance impact estimate from two remote computers.
+ * This is an operation sync insertion degradation rate test java class, which shows the performance
+ * impact when enable operation sync feature. You can run this code in the same way as
+ * OperationSyncExample.java. Since IoTDB-A enable the operation sync feature, the performance
+ * impact is correct only when Both IoTDB-A and IoTDB-B run on the same computer. Or you can modify
+ * the default configuration of IoTDB-A and IoTDB-B after becoming familiar with
+ * OperationSyncExample, OperationSyncUtil to get A more accurate performance impact estimate from
+ * two remote computers.
*/
-public class DoubleWriteDegradationRate extends DoubleWriteUtil {
+public class OperationSyncDegradationRate extends OperationSyncUtil {
private static final String dA = "d0";
private static final String dB = "d1";
/**
- * The following three fields are insert configuration parameters. The double write feature
+ * The following three fields are insert configuration parameters. The operation sync feature
* already applies to all write interfaces, so you are free to modify these parameters.
*/
// Total insertion requests during test
@@ -54,7 +55,7 @@ public class DoubleWriteDegradationRate extends DoubleWriteUtil {
long startTime = System.currentTimeMillis();
threadA.start();
threadA.join();
- double doubleWriteCost = System.currentTimeMillis() - startTime;
+ double operationSyncost = System.currentTimeMillis() - startTime;
startTime = System.currentTimeMillis();
threadB.start();
@@ -65,11 +66,11 @@ public class DoubleWriteDegradationRate extends DoubleWriteUtil {
double total = batchCnt * batchSize;
System.out.println("Normal write cost: " + normalWriteCost / 1000.0 + "s");
System.out.println("Average: " + normalWriteCost / total + " ms per insertion");
- System.out.println("Double write cost: " + doubleWriteCost / 1000.0 + "s");
- System.out.println("Average: " + doubleWriteCost / total + " ms per insertion");
+ System.out.println("Operation Sync cost: " + operationSyncost / 1000.0 + "s");
+ System.out.println("Average: " + operationSyncost / total + " ms per insertion");
System.out.println(
"Performance degradation rate : "
- + (doubleWriteCost - normalWriteCost) / normalWriteCost * 100.0
+ + (operationSyncost - normalWriteCost) / normalWriteCost * 100.0
+ "%");
}
}
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDurability.java
similarity index 76%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDurability.java
index 5cfafb4a83..895db6a756 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteDurability.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncDurability.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
import org.apache.iotdb.session.pool.SessionPool;
-public class DoubleWriteDurability extends DoubleWriteUtil {
+public class OperationSyncDurability extends OperationSyncUtil {
private static final String dA = "d0";
private static final String dB = "d1";
@@ -41,13 +41,13 @@ public class DoubleWriteDurability extends DoubleWriteUtil {
}
sessionPoolA.setStorageGroup(sg);
- // Create double write threads
- DoubleWriteThread doubleWriteThreadA =
- new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt, batchSize);
- threadA = new Thread(doubleWriteThreadA);
- DoubleWriteThread doubleWriteThreadB =
- new DoubleWriteThread(sessionPoolA, dB, batchCnt, timeseriesCnt, batchSize);
- threadB = new Thread(doubleWriteThreadB);
+ // Create operation sync threads
+ OperationSyncThread operationSyncThreadA =
+ new OperationSyncThread(sessionPoolA, dA, batchCnt, timeseriesCnt, batchSize);
+ threadA = new Thread(operationSyncThreadA);
+ OperationSyncThread operationSyncThreadB =
+ new OperationSyncThread(sessionPoolA, dB, batchCnt, timeseriesCnt, batchSize);
+ threadB = new Thread(operationSyncThreadB);
insertData();
}
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncEmpty.java
similarity index 89%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncEmpty.java
index 73ff3a2fee..a6878c9c82 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteEmpty.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncEmpty.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
-public class DoubleWriteEmpty extends DoubleWriteUtil {
+public class OperationSyncEmpty extends OperationSyncUtil {
public static void main(String[] args) {
initEnvironment();
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncExample.java
similarity index 80%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncExample.java
index fc363b1015..e9c3f3686c 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteExample.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncExample.java
@@ -16,22 +16,23 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionDataSetWrapper;
/**
- * This is a simple double write example java class, which shows how to enable double write feature
- * and proves that the double write feature performs correctly. To run this code, you need to start
- * another IoTDB service. The easiest way to do this is to copy the
+ * This is a simple operation sync example java class, which shows how to enable operation sync
+ * feature and proves that the operation sync feature performs correctly. To run this code, you need
+ * to start another IoTDB service. The easiest way to do this is to copy the
* ./server/target/iotdb-server-0.12.5-SNAPSHOT folder to any location. Then change the default port
* in iotdb-engine.properties to 6668, and start the service using the start-server script. Or you
- * can start IoTDB-B on another computer and modify the configuration of IoTDB-B in DoubleWriteUtil.
- * Finally, you can run this code and see double write feature from the command line.
+ * can start IoTDB-B on another computer and modify the configuration of IoTDB-B in
+ * OperationSyncUtil. Finally, you can run this code and see operation sync feature from the command
+ * line.
*/
-public class DoubleWriteExample extends DoubleWriteUtil {
+public class OperationSyncExample extends OperationSyncUtil {
private static final String dA = "d0";
private static final String dB = "d0";
@@ -40,7 +41,7 @@ public class DoubleWriteExample extends DoubleWriteUtil {
private static final int timeseriesCnt = 1;
private static final int batchSize = 1;
- private static final String sql = "select * from root.DOUBLEWRITESG.d0";
+ private static final String sql = "select * from root.OPERATIONSYNCSG.d0";
public static void main(String[] args) throws Exception {
initEnvironment();
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncThread.java
similarity index 95%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncThread.java
index 9e9a2a49ed..3ab5dd1b16 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteThread.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncThread.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -29,18 +29,18 @@ import java.util.ArrayList;
import java.util.List;
/** Thread for insertion. Normally you don't need to modify this java class. */
-public class DoubleWriteThread implements Runnable {
+public class OperationSyncThread implements Runnable {
private final SessionPool sessionPool;
- private final String sg = "root.DOUBLEWRITESG";
+ private final String sg = "root.OPERATIONSYNCSG";
private final String deviceId;
private final int batchCnt;
private final int timeseriesCnt;
private final int batchSize;
- DoubleWriteThread(
+ OperationSyncThread(
SessionPool sessionPool, String deviceId, int batchCnt, int timeseriesCnt, int batchSize)
throws IoTDBConnectionException, StatementExecutionException {
this.sessionPool = sessionPool;
diff --git a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncUtil.java
similarity index 79%
rename from example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
rename to example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncUtil.java
index be65248a32..85ea25dbe2 100644
--- a/example/doublewrite/src/main/java/org/apache/iotdb/doublewrite/DoubleWriteUtil.java
+++ b/example/operationsync/src/main/java/org/apache/iotdb/operationsync/OperationSyncUtil.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.doublewrite;
+package org.apache.iotdb.operationsync;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -26,10 +26,10 @@ import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.pool.SessionPool;
/**
- * This java class is used to create the double write examples environment. You can set IoTDB-B
+ * This java class is used to create the operation sync examples environment. You can set IoTDB-B
* config here
*/
-public abstract class DoubleWriteUtil {
+public abstract class OperationSyncUtil {
// IoTDB-A config
// Started by EnvironmentUtils, shouldn't be modified
@@ -51,22 +51,22 @@ public abstract class DoubleWriteUtil {
protected static final int concurrency = 5;
// Default name of StorageGroup
- protected static final String sg = "root.DOUBLEWRITESG";
+ protected static final String sg = "root.OPERATIONSYNCSG";
- // Threads for double write
+ // Threads for operation sync
protected static Thread threadA;
protected static Thread threadB;
protected static void initEnvironment() {
- // Start local IoTDB-A on ip "127.0.0.1", port 6667 and set enableDoubleWrite
+ // Start local IoTDB-A on ip "127.0.0.1", port 6667 and set enableOperationSync
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- config.setEnableDoubleWrite(true);
+ config.setEnableOperationSync(true);
config.setSecondaryAddress(ipB);
config.setSecondaryPort(portB);
config.setSecondaryUser(userB);
config.setSecondaryPassword(passwordB);
- config.setDoubleWriteMaxLogSize(1024);
+ config.setOperationSyncMaxLogSize(1024);
EnvironmentUtils.envSetUp();
}
@@ -92,19 +92,19 @@ public abstract class DoubleWriteUtil {
sessionPoolA.setStorageGroup(sg);
sessionPoolB.setStorageGroup(sg);
- // Create double write threads
- DoubleWriteThread doubleWriteThreadA =
- new DoubleWriteThread(sessionPoolA, dA, batchCnt, timeseriesCnt, batchSize);
- threadA = new Thread(doubleWriteThreadA);
- DoubleWriteThread doubleWriteThreadB =
- new DoubleWriteThread(sessionPoolB, dB, batchCnt, timeseriesCnt, batchSize);
- threadB = new Thread(doubleWriteThreadB);
+ // Create operation sync threads
+ OperationSyncThread operationSyncThreadA =
+ new OperationSyncThread(sessionPoolA, dA, batchCnt, timeseriesCnt, batchSize);
+ threadA = new Thread(operationSyncThreadA);
+ OperationSyncThread operationSyncThreadB =
+ new OperationSyncThread(sessionPoolB, dB, batchCnt, timeseriesCnt, batchSize);
+ threadB = new Thread(operationSyncThreadB);
}
protected static void cleanEnvironment() throws Exception {
// Clean StorageGroups, close sessionPools and shut down environment
- sessionPoolA.deleteStorageGroup("root.DOUBLEWRITESG");
- sessionPoolB.deleteStorageGroup("root.DOUBLEWRITESG");
+ sessionPoolA.deleteStorageGroup(sg);
+ sessionPoolB.deleteStorageGroup(sg);
sessionPoolA.close();
sessionPoolB.close();
diff --git a/example/pom.xml b/example/pom.xml
index aced1b0801..04695c8a3d 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -47,7 +47,7 @@
<module>udf</module>
<module>trigger</module>
<module>rabbitmq</module>
- <module>doublewrite</module>
+ <module>operationsync</module>
</modules>
<build>
<pluginManagement>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 81c05aca0f..a64066b00d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -820,8 +820,8 @@ public class IoTDBConfig {
/** Encryption provided class parameter */
private String encryptDecryptProviderParameter;
- // DoubleWrite Config
- private boolean enableDoubleWrite = false;
+ // Operation Sync Config
+ private boolean enableOperationSync = false;
// Secondary IoTDB
private String secondaryAddress = "127.0.0.1";
@@ -829,23 +829,23 @@ public class IoTDBConfig {
private String secondaryUser = "root";
private String secondaryPassword = "root";
- // The transmitting concurrency size of double write SessionPool
- private int doubleWriteSessionConcurrencySize = 8;
+ // The transmitting concurrency size of operation sync SessionPool
+ private int OperationSyncSessionConcurrencySize = 8;
- // DoubleWriteLog dir
- private String doubleWriteLogDir =
- DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DOUBLEWRITE_FOLDER_NAME;
- // The validity of each DoubleWriteLog
- private int doubleWriteLogValidity = 30;
- // The maximum id of DoubleWriteLog
- private int doubleWriteLogNum = 32767;
- // The max size of all the DoubleWriteLog. Default is 100GB
- private long doubleWriteMaxLogSize = 107374182400L;
+ // OperationSyncLog dir
+ private String operationSyncLogDir =
+ DEFAULT_BASE_DIR + File.separator + IoTDBConstant.OPERATION_SYNC_FOLDER_NAME;
+ // The validity of each OperationSyncLog
+ private int operationSyncLogValidity = 30;
+ // The maximum id of OperationSyncLog
+ private int operationSyncLogNum = 32767;
+ // The max size of all the OperationSyncLog. Default is 100GB
+ private long operationSyncMaxLogSize = 107374182400L;
- // DoubleWrite InsertPlan cache size
- private int doubleWriteProducerCacheSize = 1024;
- // DoubleWriteConsumer concurrency size
- private int doubleWriteConsumerConcurrencySize = 4;
+ // OperationSyncProducer DML cache size
+ private int operationSyncProducerCacheSize = 1024;
+ // OperationSyncConsumer concurrency size
+ private int operationSyncConsumerConcurrencySize = 4;
public IoTDBConfig() {
// empty constructor
@@ -980,7 +980,7 @@ public class IoTDBConfig {
extDir = addHomeDir(extDir);
udfDir = addHomeDir(udfDir);
triggerDir = addHomeDir(triggerDir);
- doubleWriteLogDir = addHomeDir(doubleWriteLogDir);
+ operationSyncLogDir = addHomeDir(operationSyncLogDir);
if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
String hdfsDir = getHdfsDir();
@@ -2594,12 +2594,12 @@ public class IoTDBConfig {
this.encryptDecryptProviderParameter = encryptDecryptProviderParameter;
}
- public boolean isEnableDoubleWrite() {
- return enableDoubleWrite;
+ public boolean isEnableOperationSync() {
+ return enableOperationSync;
}
- public void setEnableDoubleWrite(boolean enableDoubleWrite) {
- this.enableDoubleWrite = enableDoubleWrite;
+ public void setEnableOperationSync(boolean enableOperationSync) {
+ this.enableOperationSync = enableOperationSync;
}
public String getSecondaryAddress() {
@@ -2634,59 +2634,59 @@ public class IoTDBConfig {
this.secondaryPassword = secondaryPassword;
}
- public int getDoubleWriteSessionConcurrencySize() {
- return doubleWriteSessionConcurrencySize;
+ public int getOperationSyncSessionConcurrencySize() {
+ return OperationSyncSessionConcurrencySize;
}
- public void setDoubleWriteSessionConcurrencySize(int doubleWriteSessionConcurrencySize) {
- this.doubleWriteSessionConcurrencySize = doubleWriteSessionConcurrencySize;
+ public void setOperationSyncSessionConcurrencySize(int operationSyncSessionConcurrencySize) {
+ this.OperationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
}
- public String getDoubleWriteLogDir() {
- return doubleWriteLogDir;
+ public String getOperationSyncLogDir() {
+ return operationSyncLogDir;
}
- public void setDoubleWriteLogDir(String doubleWriteLogDir) {
- this.doubleWriteLogDir = doubleWriteLogDir;
+ public void setOperationSyncLogDir(String operationSyncLogDir) {
+ this.operationSyncLogDir = operationSyncLogDir;
}
- public int getDoubleWriteLogValidity() {
- return doubleWriteLogValidity;
+ public int getOperationSyncLogValidity() {
+ return operationSyncLogValidity;
}
- public void setDoubleWriteLogValidity(int doubleWriteLogValidity) {
- this.doubleWriteLogValidity = doubleWriteLogValidity;
+ public void setOperationSyncLogValidity(int operationSyncLogValidity) {
+ this.operationSyncLogValidity = operationSyncLogValidity;
}
- public int getDoubleWriteLogNum() {
- return doubleWriteLogNum;
+ public int getOperationSyncLogNum() {
+ return operationSyncLogNum;
}
- public void setDoubleWriteLogNum(int doubleWriteLogNum) {
- this.doubleWriteLogNum = doubleWriteLogNum;
+ public void setOperationSyncLogNum(int operationSyncLogNum) {
+ this.operationSyncLogNum = operationSyncLogNum;
}
- public long getDoubleWriteMaxLogSize() {
- return doubleWriteMaxLogSize;
+ public long getOperationSyncMaxLogSize() {
+ return operationSyncMaxLogSize;
}
- public void setDoubleWriteMaxLogSize(long doubleWriteMaxLogSize) {
- this.doubleWriteMaxLogSize = doubleWriteMaxLogSize;
+ public void setOperationSyncMaxLogSize(long operationSyncMaxLogSize) {
+ this.operationSyncMaxLogSize = operationSyncMaxLogSize;
}
- public int getDoubleWriteProducerCacheSize() {
- return doubleWriteProducerCacheSize;
+ public int getOperationSyncProducerCacheSize() {
+ return operationSyncProducerCacheSize;
}
- public void setDoubleWriteProducerCacheSize(int doubleWriteProducerCacheSize) {
- this.doubleWriteProducerCacheSize = doubleWriteProducerCacheSize;
+ public void setOperationSyncProducerCacheSize(int operationSyncProducerCacheSize) {
+ this.operationSyncProducerCacheSize = operationSyncProducerCacheSize;
}
- public int getDoubleWriteConsumerConcurrencySize() {
- return doubleWriteConsumerConcurrencySize;
+ public int getOperationSyncConsumerConcurrencySize() {
+ return operationSyncConsumerConcurrencySize;
}
- public void setDoubleWriteConsumerConcurrencySize(int doubleWriteConsumerConcurrencySize) {
- this.doubleWriteConsumerConcurrencySize = doubleWriteConsumerConcurrencySize;
+ public void setOperationSyncConsumerConcurrencySize(int operationSyncConsumerConcurrencySize) {
+ this.operationSyncConsumerConcurrencySize = operationSyncConsumerConcurrencySize;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 848b35e4d5..21228f3cd7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -154,8 +154,8 @@ public class IoTDBConstant {
public static final String UDF_FOLDER_NAME = "udf";
public static final String TRIGGER_FOLDER_NAME = "trigger";
- // doublewrite folder name
- public static final String DOUBLEWRITE_FOLDER_NAME = "doublewrite";
+ // Operation Sync folder name
+ public static final String OPERATION_SYNC_FOLDER_NAME = "operationsync";
// mqtt
public static final String ENABLE_MQTT = "enable_mqtt_service";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index bbb5dfbe6c..2e86b8ff86 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -790,11 +790,11 @@ public class IoTDBDescriptor {
"iotdb_server_encrypt_decrypt_provider_parameter",
conf.getEncryptDecryptProviderParameter()));
- // set DoubleWrite config
- conf.setEnableDoubleWrite(
+ // set OperationSync config
+ conf.setEnableOperationSync(
Boolean.parseBoolean(
properties.getProperty(
- "enable_double_write", String.valueOf(conf.isEnableDoubleWrite()))));
+ "enable_operation_sync", String.valueOf(conf.isEnableOperationSync()))));
conf.setSecondaryAddress(
properties.getProperty("secondary_address", conf.getSecondaryAddress()));
@@ -808,42 +808,43 @@ public class IoTDBDescriptor {
conf.setSecondaryPassword(
properties.getProperty("secondary_password", conf.getSecondaryPassword()));
- conf.setDoubleWriteSessionConcurrencySize(
+ conf.setOperationSyncSessionConcurrencySize(
Integer.parseInt(
properties.getProperty(
- "double_write_session_concurrency_size",
- String.valueOf(conf.getDoubleWriteSessionConcurrencySize()))));
+ "operation_sync_session_concurrency_size",
+ String.valueOf(conf.getOperationSyncSessionConcurrencySize()))));
- conf.setDoubleWriteLogDir(
- properties.getProperty("double_write_log_dir", conf.getDoubleWriteLogDir()));
+ conf.setOperationSyncLogDir(
+ properties.getProperty("operation_sync_log_dir", conf.getOperationSyncLogDir()));
- conf.setDoubleWriteLogValidity(
+ conf.setOperationSyncLogValidity(
Integer.parseInt(
properties.getProperty(
- "double_write_log_file_validity",
- String.valueOf(conf.getDoubleWriteLogValidity()))));
+ "operation_sync_log_file_validity",
+ String.valueOf(conf.getOperationSyncLogValidity()))));
- conf.setDoubleWriteLogNum(
+ conf.setOperationSyncLogNum(
Integer.parseInt(
properties.getProperty(
- "double_write_log_file_num", String.valueOf(conf.getDoubleWriteLogNum()))));
+ "operation_sync_log_file_num", String.valueOf(conf.getOperationSyncLogNum()))));
- conf.setDoubleWriteMaxLogSize(
+ conf.setOperationSyncMaxLogSize(
Long.parseLong(
properties.getProperty(
- "double_write_max_log_size", String.valueOf(conf.getDoubleWriteMaxLogSize()))));
+ "operation_sync_max_log_size",
+ String.valueOf(conf.getOperationSyncMaxLogSize()))));
- conf.setDoubleWriteProducerCacheSize(
+ conf.setOperationSyncProducerCacheSize(
Integer.parseInt(
properties.getProperty(
- "double_write_producer_cache_size",
- String.valueOf(conf.getDoubleWriteProducerCacheSize()))));
+ "operation_sync_producer_cache_size",
+ String.valueOf(conf.getOperationSyncProducerCacheSize()))));
- conf.setDoubleWriteConsumerConcurrencySize(
+ conf.setOperationSyncConsumerConcurrencySize(
Integer.parseInt(
properties.getProperty(
- "double_write_consumer_concurrency_size",
- String.valueOf(conf.getDoubleWriteConsumerConcurrencySize()))));
+ "operation_sync_consumer_concurrency_size",
+ String.valueOf(conf.getOperationSyncConsumerConcurrencySize()))));
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index 2d0a3da959..064415ab79 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.pool.SessionPool;
@@ -30,40 +30,40 @@ import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
-public class DoubleWriteConsumer implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteConsumer.class);
+public class OperationSyncConsumer implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
- private final BlockingQueue<Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType>>
- doubleWriteQueue;
- private final SessionPool doubleWriteSessionPool;
- private final DoubleWriteLogService niLogService;
+ private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ OperationSyncQueue;
+ private final SessionPool operationSyncSessionPool;
+ private final OperationSyncLogService dmlLogService;
- public DoubleWriteConsumer(
- BlockingQueue<Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType>>
- doubleWriteQueue,
- SessionPool doubleWriteSessionPool,
- DoubleWriteLogService niLogService) {
- this.doubleWriteQueue = doubleWriteQueue;
- this.doubleWriteSessionPool = doubleWriteSessionPool;
- this.niLogService = niLogService;
+ public OperationSyncConsumer(
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ OperationSyncQueue,
+ SessionPool operationSyncSessionPool,
+ OperationSyncLogService dmlLogService) {
+ this.OperationSyncQueue = OperationSyncQueue;
+ this.operationSyncSessionPool = operationSyncSessionPool;
+ this.dmlLogService = dmlLogService;
}
@Override
public void run() {
while (true) {
- Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType> head;
+ Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> head;
ByteBuffer headBuffer;
- DoubleWritePlanTypeUtils.DoubleWritePlanType headType;
+ OperationSyncPlanTypeUtils.OperationSyncPlanType headType;
try {
- head = doubleWriteQueue.take();
+ head = OperationSyncQueue.take();
headBuffer = head.left;
headType = head.right;
} catch (InterruptedException e) {
- LOGGER.error("DoubleWriteConsumer been interrupted: ", e);
+ LOGGER.error("OperationSyncConsumer been interrupted: ", e);
continue;
}
- if (headType == DoubleWritePlanTypeUtils.DoubleWritePlanType.IPlan) {
+ if (headType == OperationSyncPlanTypeUtils.OperationSyncPlanType.IPlan) {
try {
// Sleep 10ms when it's I-Plan
TimeUnit.MILLISECONDS.sleep(10);
@@ -76,14 +76,14 @@ public class DoubleWriteConsumer implements Runnable {
boolean transmitStatus = false;
try {
headBuffer.position(0);
- transmitStatus = doubleWriteSessionPool.doubleWriteTransmit(headBuffer);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and do serialization
LOGGER.warn(
- "DoubleWriteConsumer can't transmit because network failure", connectionException);
+ "OperationSyncConsumer can't transmit because network failure", connectionException);
} catch (Exception e) {
// The PhysicalPlan has internal error, reject transmit
- LOGGER.error("DoubleWriteConsumer can't transmit", e);
+ LOGGER.error("OperationSyncConsumer can't transmit", e);
continue;
}
@@ -91,12 +91,12 @@ public class DoubleWriteConsumer implements Runnable {
try {
// must set buffer position to limit() before serialization
headBuffer.position(headBuffer.limit());
- niLogService.acquireLogWriter();
- niLogService.write(headBuffer);
+ dmlLogService.acquireLogWriter();
+ dmlLogService.write(headBuffer);
} catch (IOException e) {
- LOGGER.error("DoubleWriteConsumer can't serialize physicalPlan", e);
+ LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e);
}
- niLogService.releaseLogWriter();
+ dmlLogService.releaseLogWriter();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
similarity index 71%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index 81840d6eee..b2e70c5042 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteEProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.rpc.IoTDBConnectionException;
@@ -28,15 +28,15 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
-public class DoubleWriteEProtector extends DoubleWriteProtector {
+public class OperationSyncDDLProtector extends OperationSyncProtector {
- private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteEProtector.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);
- private final SessionPool doubleWriteSessionPool;
+ private final SessionPool operationSyncSessionPool;
- public DoubleWriteEProtector(SessionPool doubleWriteSessionPool) {
+ public OperationSyncDDLProtector(SessionPool operationSyncSessionPool) {
super();
- this.doubleWriteSessionPool = doubleWriteSessionPool;
+ this.operationSyncSessionPool = operationSyncSessionPool;
}
@Override
@@ -51,15 +51,15 @@ public class DoubleWriteEProtector extends DoubleWriteProtector {
boolean transmitStatus = false;
try {
- // try double write
+ // try operation sync
planBuffer.position(0);
- transmitStatus = doubleWriteSessionPool.doubleWriteTransmit(planBuffer);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and retry
- LOGGER.warn("DoubleWriteEProtector can't transmit, retrying...", connectionException);
+ LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
} catch (Exception e) {
// error exception and break
- LOGGER.error("DoubleWriteEProtector can't transmit", e);
+ LOGGER.error("OperationSyncDDLProtector can't transmit", e);
break;
}
@@ -69,7 +69,7 @@ public class DoubleWriteEProtector extends DoubleWriteProtector {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
- LOGGER.warn("DoubleWriteEProtector is interrupted", e);
+ LOGGER.warn("OperationSyncDDLProtector is interrupted", e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
similarity index 72%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index c6c0fd09e8..5668d918f7 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteNIProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -24,20 +24,21 @@ import org.apache.iotdb.tsfile.utils.Pair;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
-public class DoubleWriteNIProtector extends DoubleWriteProtector {
+public class OperationSyncDMLProtector extends OperationSyncProtector {
- private final DoubleWriteEProtector eProtector;
- private final DoubleWriteProducer producer;
+ private final OperationSyncDDLProtector ddlProtector;
+ private final OperationSyncProducer producer;
- public DoubleWriteNIProtector(DoubleWriteEProtector eProtector, DoubleWriteProducer producer) {
+ public OperationSyncDMLProtector(
+ OperationSyncDDLProtector ddlProtector, OperationSyncProducer producer) {
super();
- this.eProtector = eProtector;
+ this.ddlProtector = ddlProtector;
this.producer = producer;
}
@Override
protected void preCheck() {
- while (eProtector.isAtWork()) {
+ while (ddlProtector.isAtWork()) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignore) {
@@ -49,6 +50,6 @@ public class DoubleWriteNIProtector extends DoubleWriteProtector {
@Override
protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
producer.put(
- new Pair<>(planBuffer, DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan)));
+ new Pair<>(planBuffer, OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan)));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
index 70df59f663..4d8a4e004d 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteLogService.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -34,22 +34,22 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public class DoubleWriteLogService implements Runnable {
+public class OperationSyncLogService implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteLogService.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncLogService.class);
private static final String logFileDir =
- IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogDir();
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogDir();
private static final long logFileValidity =
- IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity() * 1000L;
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity() * 1000L;
private static final int maxLogFileNum =
- IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogNum();
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogNum();
private static final long maxLogFileSize =
- IoTDBDescriptor.getInstance().getConfig().getDoubleWriteMaxLogSize();
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncMaxLogSize();
private static long currentLogFileSize = 0;
- private final DoubleWriteProtector protector;
+ private final OperationSyncProtector protector;
private final Lock logWriterLock;
private final String logFileName;
private int logFileID;
@@ -57,7 +57,7 @@ public class DoubleWriteLogService implements Runnable {
private File logFile;
private LogWriter logWriter;
- public DoubleWriteLogService(String logFileName, DoubleWriteProtector protector) {
+ public OperationSyncLogService(String logFileName, OperationSyncProtector protector) {
this.logFileName = logFileName;
this.protector = protector;
@@ -68,7 +68,7 @@ public class DoubleWriteLogService implements Runnable {
File logDir = new File(logFileDir);
if (!logDir.exists()) {
if (!logDir.mkdirs()) {
- LOGGER.error("Can't make DoubleWriteLog file dir: {}", logDir.getAbsolutePath());
+ LOGGER.error("Can't make OperationSyncLog file dir: {}", logDir.getAbsolutePath());
}
}
}
@@ -126,7 +126,7 @@ public class DoubleWriteLogService implements Runnable {
// Sleep 10s before next check
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
- LOGGER.error("DoubleWriteLogService been interrupted", e);
+ LOGGER.error("OperationSyncLogService been interrupted", e);
}
}
}
@@ -143,7 +143,7 @@ public class DoubleWriteLogService implements Runnable {
try {
logWriter.close();
} catch (IOException e) {
- LOGGER.warn("Can't close DoubleWriteLog: {}, retrying...", logFile.getAbsolutePath());
+ LOGGER.warn("Can't close OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
try {
// Sleep 1s and retry
TimeUnit.SECONDS.sleep(1);
@@ -153,7 +153,7 @@ public class DoubleWriteLogService implements Runnable {
continue;
}
- LOGGER.info("DoubleWriteLog: {} is expired and closed", logFile.getAbsolutePath());
+ LOGGER.info("OperationSyncLog: {} is expired and closed", logFile.getAbsolutePath());
break;
}
@@ -175,11 +175,11 @@ public class DoubleWriteLogService implements Runnable {
if (logFile.createNewFile()) {
logFileCreateTime = System.currentTimeMillis();
logWriter = new LogWriter(logFile, false);
- LOGGER.info("Create DoubleWriteLog: {}", logFile.getAbsolutePath());
+ LOGGER.info("Create OperationSyncLog: {}", logFile.getAbsolutePath());
break;
}
} catch (IOException e) {
- LOGGER.warn("Can't create DoubleWriteLog: {}, retrying...", logFile.getAbsolutePath());
+ LOGGER.warn("Can't create OperationSyncLog: {}, retrying...", logFile.getAbsolutePath());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
similarity index 78%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
index 3573d5f973..404cdf63d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWritePlanTypeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
@@ -16,11 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -28,25 +29,26 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
-public class DoubleWritePlanTypeUtils {
+public class OperationSyncPlanTypeUtils {
- public static DoubleWritePlanType getDoubleWritePlanType(PhysicalPlan plan) {
+ public static OperationSyncPlanType getOperationSyncPlanType(PhysicalPlan plan) {
if (plan instanceof SetStorageGroupPlan
|| plan instanceof DeleteStorageGroupPlan
|| plan instanceof CreateTimeSeriesPlan
|| plan instanceof CreateMultiTimeSeriesPlan
|| plan instanceof CreateAlignedTimeSeriesPlan
- || plan instanceof DeleteTimeSeriesPlan) {
- return DoubleWritePlanType.EPlan;
+ || plan instanceof DeleteTimeSeriesPlan
+ || plan instanceof AlterTimeSeriesPlan) {
+ return OperationSyncPlanType.EPlan;
} else if (plan instanceof DeletePlan) {
- return DoubleWritePlanType.IPlan;
+ return OperationSyncPlanType.IPlan;
} else if (plan instanceof InsertPlan) {
- return DoubleWritePlanType.NPlan;
+ return OperationSyncPlanType.NPlan;
}
return null;
}
- public enum DoubleWritePlanType {
+ public enum OperationSyncPlanType {
EPlan,
IPlan,
NPlan
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
similarity index 57%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
index e024487ba2..2f23b97e05 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -27,28 +27,28 @@ import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
/**
- * DoubleWriteProducer using BlockingQueue to cache PhysicalPlan. And persist some PhysicalPlan when
- * they are too many to transmit
+ * OperationSyncProducer using BlockingQueue to cache PhysicalPlan. And persist some PhysicalPlan
+ * when they are too many to transmit
*/
-public class DoubleWriteProducer {
+public class OperationSyncProducer {
- private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteProducer.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProducer.class);
- private final BlockingQueue<Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType>>
- doubleWriteQueue;
+ private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ operationSyncQueue;
- public DoubleWriteProducer(
- BlockingQueue<Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType>>
- doubleWriteQueue) {
- this.doubleWriteQueue = doubleWriteQueue;
+ public OperationSyncProducer(
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ operationSyncQueue) {
+ this.operationSyncQueue = operationSyncQueue;
}
- public void put(Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType> planPair) {
+ public void put(Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair) {
try {
planPair.left.position(0);
- doubleWriteQueue.put(planPair);
+ operationSyncQueue.put(planPair);
} catch (InterruptedException e) {
- LOGGER.error("double write cache failed.", e);
+ LOGGER.error("OperationSync cache failed.", e);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
similarity index 81%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
index 6eb00355c4..f84a976636 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
@@ -38,11 +38,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-public abstract class DoubleWriteProtector implements Runnable {
+public abstract class OperationSyncProtector implements Runnable {
- protected static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteProtector.class);
+ protected static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProtector.class);
protected static final int logFileValidity =
- IoTDBDescriptor.getInstance().getConfig().getDoubleWriteLogValidity();
+ IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity();
// For transmit log files
protected final Lock logFileListLock;
@@ -57,7 +57,7 @@ public abstract class DoubleWriteProtector implements Runnable {
// Working state
protected volatile boolean isProtectorAtWork;
- protected DoubleWriteProtector() {
+ protected OperationSyncProtector() {
logFileListLock = new ReentrantLock();
registeredLogFiles = new ArrayList<>();
@@ -82,7 +82,7 @@ public abstract class DoubleWriteProtector implements Runnable {
public void run() {
while (true) {
while (true) {
- // Wrap and transmit all DoubleWriteLogs
+ // Wrap and transmit all OperationSyncLogs
logFileListLock.lock();
if (registeredLogFiles.size() > 0) {
isProtectorAtWork = true;
@@ -102,7 +102,7 @@ public abstract class DoubleWriteProtector implements Runnable {
// Sleep a while before next check
TimeUnit.SECONDS.sleep(logFileValidity);
} catch (InterruptedException e) {
- LOGGER.warn("DoubleWriteProtector been interrupted", e);
+ LOGGER.warn("OperationSyncProtector been interrupted", e);
}
}
}
@@ -116,7 +116,7 @@ public abstract class DoubleWriteProtector implements Runnable {
logReader = new SingleFileLogReader(logFile);
} catch (FileNotFoundException e) {
LOGGER.error(
- "DoubleWriteProtector can't open DoubleWriteLog: {}, discarded",
+ "OperationSyncProtector can't open OperationSyncLog: {}, discarded",
logFile.getAbsolutePath(),
e);
continue;
@@ -128,7 +128,7 @@ public abstract class DoubleWriteProtector implements Runnable {
try {
nextPlan.serialize(protectorDeserializeStream);
} catch (IOException e) {
- LOGGER.error("DoubleWriteProtector can't serialize PhysicalPlan", e);
+ LOGGER.error("OperationSyncProtector can't serialize PhysicalPlan", e);
continue;
}
ByteBuffer nextBuffer = ByteBuffer.wrap(protectorByteStream.toByteArray());
@@ -138,27 +138,27 @@ public abstract class DoubleWriteProtector implements Runnable {
logReader.close();
try {
- // sleep one second then delete DoubleWriteLog
+ // sleep one second then delete OperationSyncLog
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
- LOGGER.warn("DoubleWriteProtector is interrupted", e);
+ LOGGER.warn("OperationSyncProtector is interrupted", e);
}
- DoubleWriteLogService.incLogFileSize(-logFile.length());
+ OperationSyncLogService.incLogFileSize(-logFile.length());
boolean deleted = false;
for (int retryCnt = 0; retryCnt < 5; retryCnt++) {
if (logFile.delete()) {
deleted = true;
- LOGGER.info("DoubleWriteLog: {} is deleted.", logFile.getAbsolutePath());
+ LOGGER.info("OperationSyncLog: {} is deleted.", logFile.getAbsolutePath());
break;
} else {
- LOGGER.warn("Delete DoubleWriteLog: {} failed. Retrying", logFile.getAbsolutePath());
+ LOGGER.warn("Delete OperationSyncLog: {} failed. Retrying", logFile.getAbsolutePath());
}
}
if (!deleted) {
- DoubleWriteLogService.incLogFileSize(logFile.length());
- LOGGER.error("Couldn't delete DoubleWriteLog: {}", logFile.getAbsolutePath());
+ OperationSyncLogService.incLogFileSize(logFile.length());
+ LOGGER.error("Couldn't delete OperationSyncLog: {}", logFile.getAbsolutePath());
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
similarity index 69%
rename from server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
rename to server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
index 3bcef6bf58..42b11e4d30 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublewrite/DoubleWriteTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncWriteTask.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.pool.SessionPool;
@@ -27,22 +27,22 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
-/** DoubleWriteTask is used for transmit one E-Plan sending by a client */
-public class DoubleWriteTask implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(DoubleWriteTask.class);
+/** OperationSyncWriteTask is used for transmit one E-Plan sending by a client */
+public class OperationSyncWriteTask implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncWriteTask.class);
private final ByteBuffer physicalPlanBuffer;
- private final SessionPool doubleWriteSessionPool;
- private final DoubleWriteEProtector eProtector;
- private final DoubleWriteLogService eLogService;
+ private final SessionPool operationSyncSessionPool;
+ private final OperationSyncDDLProtector eProtector;
+ private final OperationSyncLogService eLogService;
- public DoubleWriteTask(
+ public OperationSyncWriteTask(
ByteBuffer physicalPlanBuffer,
- SessionPool doubleWriteSessionPool,
- DoubleWriteEProtector eProtector,
- DoubleWriteLogService eLogService) {
+ SessionPool operationSyncSessionPool,
+ OperationSyncDDLProtector eProtector,
+ OperationSyncLogService eLogService) {
this.physicalPlanBuffer = physicalPlanBuffer;
- this.doubleWriteSessionPool = doubleWriteSessionPool;
+ this.operationSyncSessionPool = operationSyncSessionPool;
this.eProtector = eProtector;
this.eLogService = eLogService;
}
@@ -55,13 +55,14 @@ public class DoubleWriteTask implements Runnable {
boolean transmitStatus = false;
try {
physicalPlanBuffer.position(0);
- transmitStatus = doubleWriteSessionPool.doubleWriteTransmit(physicalPlanBuffer);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(physicalPlanBuffer);
} catch (IoTDBConnectionException connectionException) {
// warn IoTDBConnectionException and do serialization
- LOGGER.warn("DoubleWriteTask can't transmit because network failure", connectionException);
+ LOGGER.warn(
+ "OperationSyncWriteTask can't transmit because network failure", connectionException);
} catch (Exception e) {
// The PhysicalPlan has internal error, reject transmit
- LOGGER.error("DoubleWriteTask can't transmit", e);
+ LOGGER.error("OperationSyncWriteTask can't transmit", e);
return;
}
if (!transmitStatus) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index 80ffcbb14f..5185338895 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -25,13 +25,13 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
-import org.apache.iotdb.db.doublewrite.DoubleWriteConsumer;
-import org.apache.iotdb.db.doublewrite.DoubleWriteEProtector;
-import org.apache.iotdb.db.doublewrite.DoubleWriteLogService;
-import org.apache.iotdb.db.doublewrite.DoubleWriteNIProtector;
-import org.apache.iotdb.db.doublewrite.DoubleWritePlanTypeUtils;
-import org.apache.iotdb.db.doublewrite.DoubleWriteProducer;
-import org.apache.iotdb.db.doublewrite.DoubleWriteTask;
+import org.apache.iotdb.db.doublelive.OperationSyncConsumer;
+import org.apache.iotdb.db.doublelive.OperationSyncDDLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncDMLProtector;
+import org.apache.iotdb.db.doublelive.OperationSyncLogService;
+import org.apache.iotdb.db.doublelive.OperationSyncPlanTypeUtils;
+import org.apache.iotdb.db.doublelive.OperationSyncProducer;
+import org.apache.iotdb.db.doublelive.OperationSyncWriteTask;
import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
import org.apache.iotdb.db.exception.IoTDBException;
import org.apache.iotdb.db.exception.QueryInBatchStatementException;
@@ -113,10 +113,10 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
@@ -316,23 +316,23 @@ public class TSServiceImpl implements TSIService.Iface {
protected final ServiceProvider serviceProvider;
- /* Double write module */
- private static final boolean isEnableDoubleWrite =
- IoTDBDescriptor.getInstance().getConfig().isEnableDoubleWrite();
- private final SessionPool doubleWriteSessionPool;
- private final DoubleWriteProducer doubleWriteProducer;
- private final DoubleWriteEProtector doubleWriteEProtector;
- private final DoubleWriteLogService doubleWriteELogService;
+ /* OperationSync module */
+ private static final boolean isEnableOperationSync =
+ IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
+ private final SessionPool operationSyncsessionPool;
+ private final OperationSyncProducer operationSyncProducer;
+ private final OperationSyncDDLProtector operationSyncDDLProtector;
+ private final OperationSyncLogService operationSyncDDLLogService;
public TSServiceImpl() {
super();
serviceProvider = IoTDB.serviceProvider;
- if (isEnableDoubleWrite) {
- /* Open double write */
+ if (isEnableOperationSync) {
+ /* Open OperationSync */
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
- // create SessionPool for double write
- doubleWriteSessionPool =
+ // create SessionPool for OperationSync
+ operationSyncsessionPool =
new SessionPool(
config.getSecondaryAddress(),
config.getSecondaryPort(),
@@ -340,36 +340,38 @@ public class TSServiceImpl implements TSIService.Iface {
config.getSecondaryPassword(),
5);
- // create DoubleWriteEProtector and DoubleWriteELogService
- doubleWriteEProtector = new DoubleWriteEProtector(doubleWriteSessionPool);
- new Thread(doubleWriteEProtector).start();
- doubleWriteELogService = new DoubleWriteLogService("ELog", doubleWriteEProtector);
- new Thread(doubleWriteELogService).start();
-
- // create DoubleWriteProducer
- BlockingQueue<Pair<ByteBuffer, DoubleWritePlanTypeUtils.DoubleWritePlanType>> blockingQueue =
- new ArrayBlockingQueue<>(config.getDoubleWriteProducerCacheSize());
- doubleWriteProducer = new DoubleWriteProducer(blockingQueue);
-
- // create DoubleWriteNIProtector and DoubleWriteNILogService
- DoubleWriteNIProtector doubleWriteNIProtector =
- new DoubleWriteNIProtector(doubleWriteEProtector, doubleWriteProducer);
- new Thread(doubleWriteNIProtector).start();
- DoubleWriteLogService doubleWriteNILogService =
- new DoubleWriteLogService("NILog", doubleWriteNIProtector);
- new Thread(doubleWriteNILogService).start();
-
- // create DoubleWriteConsumer
- for (int i = 0; i < config.getDoubleWriteConsumerConcurrencySize(); i++) {
- DoubleWriteConsumer consumer =
- new DoubleWriteConsumer(blockingQueue, doubleWriteSessionPool, doubleWriteNILogService);
+ // create operationSyncDDLProtector and operationSyncDDLLogService
+ operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
+ new Thread(operationSyncDDLProtector).start();
+ operationSyncDDLLogService =
+ new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
+ new Thread(operationSyncDDLLogService).start();
+
+ // create OperationSyncProducer
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+ operationSyncProducer = new OperationSyncProducer(blockingQueue);
+
+ // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ OperationSyncDMLProtector operationSyncDMLProtector =
+ new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
+ new Thread(operationSyncDMLProtector).start();
+ OperationSyncLogService operationSyncDMLLogService =
+ new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
+ new Thread(operationSyncDMLLogService).start();
+
+ // create OperationSyncConsumer
+ for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
+ OperationSyncConsumer consumer =
+ new OperationSyncConsumer(
+ blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
new Thread(consumer).start();
}
} else {
- doubleWriteSessionPool = null;
- doubleWriteProducer = null;
- doubleWriteEProtector = null;
- doubleWriteELogService = null;
+ operationSyncsessionPool = null;
+ operationSyncProducer = null;
+ operationSyncDDLProtector = null;
+ operationSyncDDLLogService = null;
}
}
@@ -2138,24 +2140,26 @@ public class TSServiceImpl implements TSIService.Iface {
}
@Override
- public TSStatus executeDoubleWrite(TSInternalSyncWriteReq req) {
- PhysicalPlan physicalPlan = null;
+ public TSStatus executeOperationSync(TSOperationSyncWriteReq req) {
+ PhysicalPlan physicalPlan;
try {
ByteBuffer planBuffer = req.physicalPlan;
planBuffer.position(0);
physicalPlan = PhysicalPlan.Factory.create(req.physicalPlan);
} catch (IllegalPathException | IOException e) {
- LOGGER.error("double write deserialization failed.", e);
+ LOGGER.error("OperationSync deserialization failed.", e);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
+ .setMessage(e.getMessage());
}
- DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
- DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
if (planType == null) {
LOGGER.error(
- "DoubleWrite receive unsupported PhysicalPlan type: {}", physicalPlan.getOperatorName());
+ "OperationSync receive unsupported PhysicalPlan type: {}",
+ physicalPlan.getOperatorName());
return RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}
- // LOGGER.info("DoubleWrite receive:{}", physicalPlan.getPaths().toString());
try {
return serviceProvider.executeNonQuery(physicalPlan)
@@ -2166,55 +2170,54 @@ public class TSServiceImpl implements TSIService.Iface {
}
}
- private void transmitDoubleWrite(PhysicalPlan physicalPlan) {
+ private void transmitOperationSync(PhysicalPlan physicalPlan) {
- DoubleWritePlanTypeUtils.DoubleWritePlanType planType =
- DoubleWritePlanTypeUtils.getDoubleWritePlanType(physicalPlan);
+ OperationSyncPlanTypeUtils.OperationSyncPlanType planType =
+ OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan);
if (planType == null) {
- // Don't need DoubleWrite
+ // Don't need OperationSync
return;
}
- // LOGGER.info("DoubleWrite transmit: {}", physicalPlan.getPaths().toString());
-
// serialize physical plan
ByteBuffer buffer;
try {
int size = physicalPlan.getSerializedSize();
- ByteArrayOutputStream doubleWriteByteStream = new ByteArrayOutputStream(size);
- DataOutputStream doubleWriteSerializeStream = new DataOutputStream(doubleWriteByteStream);
- physicalPlan.serialize(doubleWriteSerializeStream);
- buffer = ByteBuffer.wrap(doubleWriteByteStream.toByteArray());
+ ByteArrayOutputStream operationSyncByteStream = new ByteArrayOutputStream(size);
+ DataOutputStream operationSyncSerializeStream = new DataOutputStream(operationSyncByteStream);
+ physicalPlan.serialize(operationSyncSerializeStream);
+ buffer = ByteBuffer.wrap(operationSyncByteStream.toByteArray());
} catch (IOException e) {
- LOGGER.error("DoubleWrite can't serialize PhysicalPlan", e);
+ LOGGER.error("OperationSync can't serialize PhysicalPlan", e);
return;
}
switch (planType) {
case EPlan:
- // Create DoubleWriteTask and wait
+ // Create OperationSyncWriteTask and wait
Thread taskThread =
new Thread(
- new DoubleWriteTask(
- buffer, doubleWriteSessionPool, doubleWriteEProtector, doubleWriteELogService));
+ new OperationSyncWriteTask(
+ buffer, operationSyncsessionPool,
+ operationSyncDDLProtector, operationSyncDDLLogService));
taskThread.start();
try {
taskThread.join();
} catch (InterruptedException e) {
- LOGGER.error("DoubleWriteTask been interrupted", e);
+ LOGGER.error("OperationSyncWriteTask been interrupted", e);
}
break;
case IPlan:
case NPlan:
- // Put into DoubleWriteProducer
- doubleWriteProducer.put(new Pair<>(buffer, planType));
+ // Put into OperationSyncProducer
+ operationSyncProducer.put(new Pair<>(buffer, planType));
}
}
protected TSStatus executeNonQueryPlan(PhysicalPlan plan) {
- if (isEnableDoubleWrite) {
- // DoubleWrite should transmit before execute
- transmitDoubleWrite(plan);
+ if (isEnableOperationSync) {
+ // OperationSync should transmit before execute
+ transmitOperationSync(plan);
}
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
similarity index 97%
rename from server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
rename to server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
index d7cb8d9b62..cb2db6969c 100644
--- a/server/src/test/java/org/apache/iotdb/db/doublewrite/DoubleWriteManualTestUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/doublelive/OperationSyncManualTestUtils.java
@@ -1,4 +1,4 @@
-package org.apache.iotdb.db.doublewrite;
+package org.apache.iotdb.db.doublelive;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class DoubleWriteManualTestUtils {
+public class OperationSyncManualTestUtils {
private static final SessionPool sessionPool =
new SessionPool("127.0.0.1", 6667, "root", "root", 3);
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 64555f584e..cb483c96ab 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -38,7 +38,7 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
@@ -2461,20 +2461,20 @@ public class Session {
}
}
- /** Transmit insert record request for double write */
- public void doubleWriteTransmit(ByteBuffer buffer)
+ /** Transmit insert record request for operation sync */
+ public void operationSyncTransmit(ByteBuffer buffer)
throws IoTDBConnectionException, StatementExecutionException {
try {
- TSInternalSyncWriteReq request = genTSExecuteDoubleWriteReq(buffer);
- defaultSessionConnection.executeDoubleWrite(request);
+ TSOperationSyncWriteReq request = genTSExecuteOperationSyncReq(buffer);
+ defaultSessionConnection.executeOperationSync(request);
} catch (RedirectException e) {
// ignored
}
}
- private TSInternalSyncWriteReq genTSExecuteDoubleWriteReq(ByteBuffer buffer) {
- TSInternalSyncWriteReq request = new TSInternalSyncWriteReq();
- request.setDoubleWriteType((byte) 0);
+ private TSOperationSyncWriteReq genTSExecuteOperationSyncReq(ByteBuffer buffer) {
+ TSOperationSyncWriteReq request = new TSOperationSyncWriteReq();
+ request.setOperationSyncType((byte) 0);
request.setPhysicalPlan(buffer);
return request;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 9ec01d4c7c..71c1f28c48 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -44,10 +44,10 @@ import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq;
-import org.apache.iotdb.service.rpc.thrift.TSInternalSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSLastDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSOperationSyncWriteReq;
import org.apache.iotdb.service.rpc.thrift.TSPruneSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSQueryTemplateResp;
@@ -943,16 +943,16 @@ public class SessionConnection {
}
}
- protected void executeDoubleWrite(TSInternalSyncWriteReq request)
+ protected void executeOperationSync(TSOperationSyncWriteReq request)
throws IoTDBConnectionException, StatementExecutionException, RedirectException {
request.setSessionId(sessionId);
try {
- RpcUtils.verifySuccessWithRedirection(client.executeDoubleWrite(request));
+ RpcUtils.verifySuccessWithRedirection(client.executeOperationSync(request));
} catch (TException e) {
if (reconnect()) {
try {
request.setSessionId(sessionId);
- RpcUtils.verifySuccess(client.executeDoubleWrite(request));
+ RpcUtils.verifySuccess(client.executeOperationSync(request));
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
diff --git a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
index ad8561bab5..9fa6ff9646 100644
--- a/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
+++ b/session/src/main/java/org/apache/iotdb/session/pool/SessionPool.java
@@ -2258,14 +2258,14 @@ public class SessionPool {
return null;
}
- /** Transmit insert record request for double write */
- public boolean doubleWriteTransmit(ByteBuffer buffer)
+ /** Transmit insert record request for OperationSync */
+ public boolean operationSyncTransmit(ByteBuffer buffer)
throws IoTDBConnectionException, StatementExecutionException {
for (int i = 0; i < RETRY; i++) {
Session session = getSession();
try {
buffer.position(0);
- session.doubleWriteTransmit(buffer);
+ session.operationSyncTransmit(buffer);
putBack(session);
return true;
} catch (IoTDBConnectionException e) {
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 2fdeed1847..0f699b4684 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -416,9 +416,9 @@ struct TSDropSchemaTemplateReq {
2: required string templateName
}
-struct TSInternalSyncWriteReq {
+struct TSOperationSyncWriteReq {
1: required i64 sessionId
- 2: required byte doubleWriteType
+ 2: required byte operationSyncType
3: required binary physicalPlan
}
@@ -513,5 +513,5 @@ service TSIService {
TSStatus dropSchemaTemplate(1:TSDropSchemaTemplateReq req);
- TSStatus executeDoubleWrite(1:TSInternalSyncWriteReq req);
+ TSStatus executeOperationSync(1:TSOperationSyncWriteReq req);
}
\ No newline at end of file