You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/08/23 11:58:41 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] Improve the process of secondary (#6855)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new cafc7f3dde [To rel/0.13] Improve the process of secondary (#6855)
cafc7f3dde is described below
commit cafc7f3dde0c61d1f5dff4d3df26ca7350db033a
Author: ljn55966005 <32...@users.noreply.github.com>
AuthorDate: Tue Aug 23 19:58:35 2022 +0800
[To rel/0.13] Improve the process of secondary (#6855)
---
client-cpp/pom.xml | 4 +-
compile-tools/pom.xml | 6 +-
distribution/pom.xml | 2 +-
example/client-cpp-example/pom.xml | 2 +-
example/trigger/pom.xml | 2 +-
example/udf/pom.xml | 2 +-
grafana-connector/pom.xml | 2 +-
integration/pom.xml | 6 +-
jdbc/pom.xml | 2 +-
pom.xml | 8 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 29 +++--
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 +-
.../iotdb/db/doublelive/OperationSyncConsumer.java | 38 +++---
.../db/doublelive/OperationSyncDDLProtector.java | 35 +++---
.../db/doublelive/OperationSyncDMLProtector.java | 45 +++++++-
.../db/doublelive/OperationSyncLogService.java | 35 +++---
.../db/doublelive/OperationSyncPlanTypeUtils.java | 22 +++-
.../iotdb/db/doublelive/OperationSyncProducer.java | 50 ++++++--
.../db/doublelive/OperationSyncProtector.java | 8 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 128 +++++++++++++++++----
.../qp/physical/sys/CreateContinuousQueryPlan.java | 17 +++
.../qp/physical/sys/DropContinuousQueryPlan.java | 8 ++
23 files changed, 328 insertions(+), 130 deletions(-)
diff --git a/client-cpp/pom.xml b/client-cpp/pom.xml
index b6a0014588..38888eb4ba 100644
--- a/client-cpp/pom.xml
+++ b/client-cpp/pom.xml
@@ -116,8 +116,8 @@
<cmake.root.dir>${project.parent.basedir}/compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
<thrift.exec.absolute.path>${project.parent.basedir}/compile-tools/thrift/target/build/compiler/cpp/bin/${cmake.build.type}/thrift.exe</thrift.exec.absolute.path>
<iotdb.server.script>start-server.bat</iotdb.server.script>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
<profile>
diff --git a/compile-tools/pom.xml b/compile-tools/pom.xml
index fc4d80f6b3..afab6ff8d9 100644
--- a/compile-tools/pom.xml
+++ b/compile-tools/pom.xml
@@ -35,7 +35,7 @@
<cmake-version>3.17.3</cmake-version>
<openssl.include.dir>-Dtrue1=true1</openssl.include.dir>
<bison.executable.dir>-Dtrue1=true1</bison.executable.dir>
- <cmake.build.type />
+ <cmake.build.type/>
</properties>
<modules>
<module>thrift</module>
@@ -138,8 +138,8 @@
<thrift.make.executable>make</thrift.make.executable>
<thrift.compiler.executable>thrift.exe</thrift.compiler.executable>
<gradlew.executable>gradlew.bat</gradlew.executable>
- <boost.include.dir />
- <boost.library.dir />
+ <boost.include.dir/>
+ <boost.library.dir/>
</properties>
</profile>
</profiles>
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 70dd54169d..c1aaa127ef 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -29,7 +29,7 @@
</parent>
<artifactId>iotdb-distribution</artifactId>
<name>IoTDB Distribution</name>
- <modules />
+ <modules/>
<build>
<plugins>
<plugin>
diff --git a/example/client-cpp-example/pom.xml b/example/client-cpp-example/pom.xml
index 703f8fe8ad..9a5705cb36 100644
--- a/example/client-cpp-example/pom.xml
+++ b/example/client-cpp-example/pom.xml
@@ -84,7 +84,7 @@
<properties>
<cmake.generator>Visual Studio 16 2019</cmake.generator>
<cmake.root.dir>${project.parent.basedir}/../compile-tools/thrift/target/cmake-${cmake-version}-win64-x64/</cmake.root.dir>
- <boost.include.dir />
+ <boost.include.dir/>
</properties>
</profile>
<profile>
diff --git a/example/trigger/pom.xml b/example/trigger/pom.xml
index 64145c1682..1b16ca6041 100644
--- a/example/trigger/pom.xml
+++ b/example/trigger/pom.xml
@@ -118,7 +118,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
</configuration>
<executions>
diff --git a/example/udf/pom.xml b/example/udf/pom.xml
index f4d18b2510..4d26ff932f 100644
--- a/example/udf/pom.xml
+++ b/example/udf/pom.xml
@@ -118,7 +118,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
</configuration>
<executions>
diff --git a/grafana-connector/pom.xml b/grafana-connector/pom.xml
index 4a256ad502..91b7616d35 100644
--- a/grafana-connector/pom.xml
+++ b/grafana-connector/pom.xml
@@ -170,7 +170,7 @@
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>${start-class}</mainClass>
</transformer>
diff --git a/integration/pom.xml b/integration/pom.xml
index 90deb1433d..d8e1e66f00 100644
--- a/integration/pom.xml
+++ b/integration/pom.xml
@@ -80,7 +80,7 @@
<id>LocalStandalone</id>
<properties>
<test.includedGroups>org.apache.iotdb.itbase.category.LocalStandaloneTest</test.includedGroups>
- <test.excludedGroups />
+ <test.excludedGroups/>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
@@ -142,7 +142,7 @@
<id>Remote</id>
<properties>
<test.includedGroups>org.apache.iotdb.itbase.category.RemoteTest</test.includedGroups>
- <test.excludedGroups />
+ <test.excludedGroups/>
</properties>
<activation>
<activeByDefault>false</activeByDefault>
@@ -206,7 +206,7 @@
<id>Cluster</id>
<properties>
<test.includedGroups>org.apache.iotdb.itbase.category.ClusterTest</test.includedGroups>
- <test.excludedGroups />
+ <test.excludedGroups/>
</properties>
<activation>
<activeByDefault>false</activeByDefault>
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 12c6bd5dd4..5ad6473ddb 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -203,7 +203,7 @@
</goals>
</pluginExecutionFilter>
<action>
- <ignore />
+ <ignore/>
</action>
</pluginExecution>
</pluginExecutions>
diff --git a/pom.xml b/pom.xml
index 0930fdcd7a..371b4c81cb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,7 +159,7 @@
<sonar.junit.reportPaths>target/surefire-reports,target/failsafe-reports</sonar.junit.reportPaths>
<!-- By default, the argLine is empty-->
<gson.version>2.8.8</gson.version>
- <argLine />
+ <argLine/>
<!-- whether enable compiling the cpp client-->
<client-cpp>false</client-cpp>
<!-- disable enforcer by default-->
@@ -693,7 +693,7 @@
<importOrder>
<order>org.apache.iotdb,,javax,java,\#</order>
</importOrder>
- <removeUnusedImports />
+ <removeUnusedImports/>
</java>
<lineEndings>UNIX</lineEndings>
</configuration>
@@ -768,7 +768,7 @@
<phase>validate</phase>
<configuration>
<rules>
- <dependencyConvergence />
+ <dependencyConvergence/>
</rules>
</configuration>
<goals>
@@ -814,7 +814,7 @@
</requireJavaVersion>
<!-- Disabled for now as it breaks the ability to build single modules -->
<!--reactorModuleConvergence/-->
- <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies" />
+ <banVulnerable implementation="org.sonatype.ossindex.maven.enforcer.BanVulnerableDependencies"/>
</rules>
</configuration>
</execution>
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index adff76ea4e..997ea249d6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -830,8 +830,10 @@ public class IoTDBConfig {
private String secondaryUser = "root";
private String secondaryPassword = "root";
+ private int secondarySessionPoolMaxSize = 10;
+
// The transmitting concurrency size of operation sync SessionPool
- private int OperationSyncSessionConcurrencySize = 8;
+ private int operationSyncSessionConcurrencySize = 8;
// OperationSyncLog dir
private String operationSyncLogDir =
@@ -845,8 +847,9 @@ public class IoTDBConfig {
// OperationSyncProducer DML cache size
private int operationSyncProducerCacheSize = 1024;
- // OperationSyncConsumer concurrency size
- private int operationSyncConsumerConcurrencySize = 4;
+
+ // OperationSyncProducer DML cache number
+ private int operationSyncProducerCacheNum = 8;
// The max record num returned in one schema query.
private int schemaQueryFetchSize = 10000000;
@@ -2662,12 +2665,20 @@ public class IoTDBConfig {
this.secondaryPassword = secondaryPassword;
}
+ public int getSecondarySessionPoolMaxSize() {
+ return secondarySessionPoolMaxSize;
+ }
+
+ public void setSecondarySessionPoolMaxSize(int secondarySessionPoolMaxSize) {
+ this.secondarySessionPoolMaxSize = secondarySessionPoolMaxSize;
+ }
+
public int getOperationSyncSessionConcurrencySize() {
- return OperationSyncSessionConcurrencySize;
+ return operationSyncSessionConcurrencySize;
}
public void setOperationSyncSessionConcurrencySize(int operationSyncSessionConcurrencySize) {
- this.OperationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
+ this.operationSyncSessionConcurrencySize = operationSyncSessionConcurrencySize;
}
public String getOperationSyncLogDir() {
@@ -2710,12 +2721,12 @@ public class IoTDBConfig {
this.operationSyncProducerCacheSize = operationSyncProducerCacheSize;
}
- public int getOperationSyncConsumerConcurrencySize() {
- return operationSyncConsumerConcurrencySize;
+ public int getOperationSyncProducerCacheNum() {
+ return operationSyncProducerCacheNum;
}
- public void setOperationSyncConsumerConcurrencySize(int operationSyncConsumerConcurrencySize) {
- this.operationSyncConsumerConcurrencySize = operationSyncConsumerConcurrencySize;
+ public void setOperationSyncProducerCacheNum(int operationSyncProducerCacheNum) {
+ this.operationSyncProducerCacheNum = operationSyncProducerCacheNum;
}
public int getSchemaQueryFetchSize() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 21228f3cd7..1294002707 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -39,6 +39,7 @@ public class IoTDBConstant {
public static final String AUDIT_LOGGER_NAME = "IoTDB_AUDIT_LOGGER";
public static final String SLOW_SQL_LOGGER_NAME = "SLOW_SQL";
public static final String COMPACTION_LOGGER_NAME = "COMPACTION";
+ public static final String DOUBLE_LIVE_LOGGER_NAME = "DOUBLE_LIVE";
public static final String IOTDB_JMX_PORT = "iotdb.jmx.port";
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c277e9bc6a..46789e8aa7 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -839,11 +839,11 @@ public class IoTDBDescriptor {
"operation_sync_producer_cache_size",
String.valueOf(conf.getOperationSyncProducerCacheSize()))));
- conf.setOperationSyncConsumerConcurrencySize(
+ conf.setOperationSyncProducerCacheNum(
Integer.parseInt(
properties.getProperty(
- "operation_sync_consumer_concurrency_size",
- String.valueOf(conf.getOperationSyncConsumerConcurrencySize()))));
+ "operation_sync_producer_cache_num",
+ String.valueOf(conf.getOperationSyncProducerCacheNum()))));
conf.setSchemaQueryFetchSize(
Integer.parseInt(
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
index 3f05766d53..cdb23d24eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncConsumer.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.doublelive;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.pool.SessionPool;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -33,16 +34,16 @@ public class OperationSyncConsumer implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncConsumer.class);
private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- OperationSyncQueue;
+ operationSyncQueue;
private final SessionPool operationSyncSessionPool;
private final OperationSyncLogService dmlLogService;
public OperationSyncConsumer(
BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- OperationSyncQueue,
+ operationSyncQueue,
SessionPool operationSyncSessionPool,
OperationSyncLogService dmlLogService) {
- this.OperationSyncQueue = OperationSyncQueue;
+ this.operationSyncQueue = operationSyncQueue;
this.operationSyncSessionPool = operationSyncSessionPool;
this.dmlLogService = dmlLogService;
}
@@ -52,31 +53,28 @@ public class OperationSyncConsumer implements Runnable {
while (true) {
Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> head;
ByteBuffer headBuffer;
- OperationSyncPlanTypeUtils.OperationSyncPlanType headType;
try {
- head = OperationSyncQueue.take();
+ head = operationSyncQueue.take();
headBuffer = head.left;
- headType = head.right;
} catch (InterruptedException e) {
LOGGER.error("OperationSyncConsumer been interrupted: ", e);
continue;
}
-
- headBuffer.position(0);
boolean transmitStatus = false;
- try {
- headBuffer.position(0);
- transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
- } catch (IoTDBConnectionException connectionException) {
- // warn IoTDBConnectionException and do serialization
- LOGGER.warn(
- "OperationSyncConsumer can't transmit because network failure", connectionException);
- } catch (Exception e) {
- // The PhysicalPlan has internal error, reject transmit
- LOGGER.error("OperationSyncConsumer can't transmit", e);
- continue;
+ if (StorageEngine.isSecondaryAlive().get()) {
+ try {
+ headBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(headBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and do serialization
+ LOGGER.warn(
+ "OperationSyncConsumer can't transmit because network failure", connectionException);
+ } catch (Exception e) {
+ // The PhysicalPlan has internal error, reject transmit
+ LOGGER.error("OperationSyncConsumer can't transmit", e);
+ continue;
+ }
}
-
if (!transmitStatus) {
try {
// must set buffer position to limit() before serialization
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
index b2e70c5042..1870145980 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDDLProtector.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.doublelive;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.pool.SessionPool;
@@ -30,7 +31,7 @@ import java.util.concurrent.TimeUnit;
public class OperationSyncDDLProtector extends OperationSyncProtector {
- private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncDDLProtector.class);;
private final SessionPool operationSyncSessionPool;
@@ -49,20 +50,26 @@ public class OperationSyncDDLProtector extends OperationSyncProtector {
while (true) {
// transmit E-Plan until it's been received
boolean transmitStatus = false;
-
- try {
- // try operation sync
- planBuffer.position(0);
- transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
- } catch (IoTDBConnectionException connectionException) {
- // warn IoTDBConnectionException and retry
- LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
- } catch (Exception e) {
- // error exception and break
- LOGGER.error("OperationSyncDDLProtector can't transmit", e);
- break;
+ if (StorageEngine.isSecondaryAlive().get()) {
+ try {
+ // try operation sync
+ planBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and retry
+ LOGGER.warn("OperationSyncDDLProtector can't transmit, retrying...", connectionException);
+ } catch (Exception e) {
+ // error exception and break
+ LOGGER.error("OperationSyncDDLProtector can't transmit", e);
+ break;
+ }
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(30);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
-
if (transmitStatus) {
break;
} else {
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
index 5668d918f7..2c2c89ef94 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncDMLProtector.java
@@ -18,8 +18,10 @@
*/
package org.apache.iotdb.db.doublelive;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.session.pool.SessionPool;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
@@ -27,13 +29,13 @@ import java.util.concurrent.TimeUnit;
public class OperationSyncDMLProtector extends OperationSyncProtector {
private final OperationSyncDDLProtector ddlProtector;
- private final OperationSyncProducer producer;
+ private final SessionPool operationSyncSessionPool;
public OperationSyncDMLProtector(
- OperationSyncDDLProtector ddlProtector, OperationSyncProducer producer) {
+ OperationSyncDDLProtector ddlProtector, SessionPool operationSyncSessionPool) {
super();
this.ddlProtector = ddlProtector;
- this.producer = producer;
+ this.operationSyncSessionPool = operationSyncSessionPool;
}
@Override
@@ -49,7 +51,38 @@ public class OperationSyncDMLProtector extends OperationSyncProtector {
@Override
protected void transmitPhysicalPlan(ByteBuffer planBuffer, PhysicalPlan physicalPlan) {
- producer.put(
- new Pair<>(planBuffer, OperationSyncPlanTypeUtils.getOperationSyncPlanType(physicalPlan)));
+ while (true) {
+ // transmit E-Plan until it's been received
+ boolean transmitStatus = false;
+ if (StorageEngine.isSecondaryAlive().get()) {
+ try {
+ // try operation sync
+ planBuffer.position(0);
+ transmitStatus = operationSyncSessionPool.operationSyncTransmit(planBuffer);
+ } catch (IoTDBConnectionException connectionException) {
+ // warn IoTDBConnectionException and retry
+ LOGGER.warn("OperationSyncDMLProtector can't transmit, retrying...", connectionException);
+ } catch (Exception e) {
+ // error exception and break
+ LOGGER.error("OperationSyncDMLProtector can't transmit", e);
+ break;
+ }
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(30);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ if (transmitStatus) {
+ break;
+ } else {
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ LOGGER.warn("OperationSyncDMLProtector is interrupted", e);
+ }
+ }
+ }
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
index a11d3fee28..dc061c167e 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncLogService.java
@@ -38,13 +38,13 @@ public class OperationSyncLogService implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncLogService.class);
- private static final String logFileDir =
+ private static final String LOG_FILE_DIR =
IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogDir();
- private static final long logFileValidity =
+ private static final long LOG_FILE_VALIDITY =
IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity() * 1000L;
- private static final int maxLogFileNum =
+ private static final int MAX_LOG_FILE_NUM =
IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogNum();
- private static final long maxLogFileSize =
+ private static final long MAX_LOG_FILE_SIZE =
IoTDBDescriptor.getInstance().getConfig().getOperationSyncMaxLogSize();
private static long currentLogFileSize = 0;
@@ -65,7 +65,7 @@ public class OperationSyncLogService implements Runnable {
this.logFile = null;
this.logWriter = null;
- File logDir = new File(logFileDir);
+ File logDir = new File(LOG_FILE_DIR);
if (!logDir.exists()) {
if (!logDir.mkdirs()) {
LOGGER.error("Can't make OperationSyncLog file dir: {}", logDir.getAbsolutePath());
@@ -77,9 +77,9 @@ public class OperationSyncLogService implements Runnable {
public void run() {
// Check if there exists remnant logs
List<Integer> logFileIDList = new ArrayList<>();
- for (int ID = 0; ID < maxLogFileNum; ID++) {
+ for (int ID = 0; ID < MAX_LOG_FILE_NUM; ID++) {
File file =
- SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + ID);
+ SystemFileFactory.INSTANCE.getFile(LOG_FILE_DIR + File.separator + logFileName + ID);
if (file.exists()) {
logFileIDList.add(ID);
}
@@ -96,10 +96,12 @@ public class OperationSyncLogService implements Runnable {
}
for (int i = firstID; i < logFileIDList.size(); i++) {
- protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+ protector.registerLogFile(
+ LOG_FILE_DIR + File.separator + logFileName + logFileIDList.get(i));
}
for (int i = 0; i < firstID; i++) {
- protector.registerLogFile(logFileDir + File.separator + logFileName + logFileIDList.get(i));
+ protector.registerLogFile(
+ LOG_FILE_DIR + File.separator + logFileName + logFileIDList.get(i));
}
int nextID;
@@ -108,7 +110,7 @@ public class OperationSyncLogService implements Runnable {
} else {
nextID = logFileIDList.get(firstID - 1) + 1;
}
- logFileID = nextID % maxLogFileNum;
+ logFileID = nextID % MAX_LOG_FILE_NUM;
} else {
logFileID = 0;
}
@@ -117,7 +119,8 @@ public class OperationSyncLogService implements Runnable {
// Check the validity of logFile
logWriterLock.lock();
try {
- if (logWriter != null && System.currentTimeMillis() - logFileCreateTime > logFileValidity) {
+ if (logWriter != null
+ && System.currentTimeMillis() - logFileCreateTime > LOG_FILE_VALIDITY) {
// Submit logFile when it's expired
submitLogFile();
}
@@ -161,10 +164,10 @@ public class OperationSyncLogService implements Runnable {
}
protector.registerLogFile(
- logFileDir
+ LOG_FILE_DIR
+ File.separator
+ logFileName
- + (logFileID - 1 + maxLogFileNum) % maxLogFileNum);
+ + (logFileID - 1 + MAX_LOG_FILE_NUM) % MAX_LOG_FILE_NUM);
logWriter = null;
logFile = null;
@@ -172,7 +175,7 @@ public class OperationSyncLogService implements Runnable {
private void createLogFile() {
logFile =
- SystemFileFactory.INSTANCE.getFile(logFileDir + File.separator + logFileName + logFileID);
+ SystemFileFactory.INSTANCE.getFile(LOG_FILE_DIR + File.separator + logFileName + logFileID);
while (true) {
try {
if (logFile.createNewFile()) {
@@ -190,7 +193,7 @@ public class OperationSyncLogService implements Runnable {
}
}
}
- logFileID = (logFileID + 1) % maxLogFileNum;
+ logFileID = (logFileID + 1) % MAX_LOG_FILE_NUM;
}
public static synchronized void incLogFileSize(long size) {
@@ -202,7 +205,7 @@ public class OperationSyncLogService implements Runnable {
}
public void write(ByteBuffer buffer) throws IOException {
- if (currentLogFileSize < maxLogFileSize) {
+ if (currentLogFileSize < MAX_LOG_FILE_SIZE) {
if (logWriter == null) {
// Create logFile when there are no valid
createLogFile();
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
index 774cb3e2e1..dccd4db016 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncPlanTypeUtils.java
@@ -24,23 +24,32 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeactivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
public class OperationSyncPlanTypeUtils {
public static OperationSyncPlanType getOperationSyncPlanType(PhysicalPlan plan) {
- if (plan instanceof SetStorageGroupPlan
+ if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
+ return OperationSyncPlanType.DMLPlan;
+ } else if (plan instanceof SetStorageGroupPlan
|| plan instanceof DeleteStorageGroupPlan
|| plan instanceof CreateTimeSeriesPlan
|| plan instanceof CreateMultiTimeSeriesPlan
@@ -54,10 +63,15 @@ public class OperationSyncPlanTypeUtils {
|| plan instanceof DropTemplatePlan
|| plan instanceof PruneTemplatePlan
|| plan instanceof SetTemplatePlan
- || plan instanceof UnsetTemplatePlan) {
+ || plan instanceof UnsetTemplatePlan
+ || plan instanceof SetTTLPlan
+ || plan instanceof CreateContinuousQueryPlan
+ || plan instanceof DataAuthPlan
+ || plan instanceof DropContinuousQueryPlan
+ || plan instanceof ChangeAliasPlan
+ || plan instanceof ChangeTagOffsetPlan
+ || plan instanceof AuthorPlan) {
return OperationSyncPlanType.DDLPlan;
- } else if (plan instanceof DeletePlan || plan instanceof InsertPlan) {
- return OperationSyncPlanType.DMLPlan;
}
return null;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
index 2f23b97e05..fbaf0ae463 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProducer.java
@@ -23,7 +23,9 @@ import org.apache.iotdb.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
/**
@@ -33,22 +35,48 @@ import java.util.concurrent.BlockingQueue;
public class OperationSyncProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProducer.class);
-
- private final BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- operationSyncQueue;
+ private final ArrayList<
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+ operationSyncQueues;
+ private final OperationSyncLogService dmlLogService;
public OperationSyncProducer(
- BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- operationSyncQueue) {
- this.operationSyncQueue = operationSyncQueue;
+ ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+ operationSyncQueue,
+ OperationSyncLogService operationSyncDMLLogService) {
+ this.operationSyncQueues = operationSyncQueue;
+ this.dmlLogService = operationSyncDMLLogService;
}
- public void put(Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair) {
+ public void put(
+ Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType> planPair,
+ String deviceName) {
+
+ ByteBuffer headBuffer;
+ headBuffer = planPair.left;
+ headBuffer.position(0);
+ int index;
+ if (deviceName == null) {
+ index = operationSyncQueues.size() - 1;
+ } else {
+ try {
+ index = Math.abs(deviceName.hashCode()) % (operationSyncQueues.size() - 1);
+ } catch (Exception e) {
+ index = 0;
+ }
+ }
+ if (operationSyncQueues.get(index).offer(planPair)) {
+ return;
+ }
try {
- planPair.left.position(0);
- operationSyncQueue.put(planPair);
- } catch (InterruptedException e) {
- LOGGER.error("OperationSync cache failed.", e);
+ // must set buffer position to limit() before serialization
+ headBuffer.position(headBuffer.limit());
+ dmlLogService.acquireLogWriter();
+ dmlLogService.write(headBuffer);
+ } catch (IOException e) {
+ LOGGER.error("OperationSyncConsumer can't serialize physicalPlan", e);
+ } finally {
+ dmlLogService.releaseLogWriter();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
index a9ff399c3b..c18af522c5 100644
--- a/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
+++ b/server/src/main/java/org/apache/iotdb/db/doublelive/OperationSyncProtector.java
@@ -41,7 +41,7 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class OperationSyncProtector implements Runnable {
protected static final Logger LOGGER = LoggerFactory.getLogger(OperationSyncProtector.class);
- protected static final int logFileValidity =
+ protected static final int LOG_FILE_VALIDITY =
IoTDBDescriptor.getInstance().getConfig().getOperationSyncLogValidity();
// For transmit log files
@@ -105,7 +105,7 @@ public abstract class OperationSyncProtector implements Runnable {
try {
// Sleep a while before next check
- TimeUnit.SECONDS.sleep(logFileValidity);
+ TimeUnit.SECONDS.sleep(LOG_FILE_VALIDITY);
} catch (InterruptedException e) {
LOGGER.warn("OperationSyncProtector been interrupted", e);
}
@@ -126,7 +126,7 @@ public abstract class OperationSyncProtector implements Runnable {
e);
continue;
}
-
+ LOGGER.info("begin trans " + logFileName);
while (logReader.hasNext()) {
// read and re-serialize the PhysicalPlan
PhysicalPlan nextPlan = logReader.next();
@@ -140,7 +140,7 @@ public abstract class OperationSyncProtector implements Runnable {
protectorByteStream.reset();
transmitPhysicalPlan(nextBuffer, nextPlan);
}
-
+ LOGGER.info("end trans " + logFileName);
logReader.close();
try {
// sleep one second then delete OperationSyncLog
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 034773b55a..9d80720866 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -66,13 +66,20 @@ import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.service.metrics.MetricsService;
+import org.apache.iotdb.db.service.metrics.enums.Metric;
+import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.db.utils.ThreadUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
+import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
+import org.apache.iotdb.metrics.utils.MetricLevel;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.session.pool.SessionPool;
+import org.apache.iotdb.session.util.SystemStatus;
import org.apache.iotdb.tsfile.utils.FilePathUtils;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -105,23 +112,26 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class StorageEngine implements IService {
private static final Logger logger = LoggerFactory.getLogger(StorageEngine.class);
-
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+ private static final long HEARTBEAT_CHECK_INTERVAL = 30L;
+ private static final long SECONDARY_METRIC_INTERVAL = 30L;
/* OperationSync module */
private static final boolean isEnableOperationSync =
IoTDBDescriptor.getInstance().getConfig().isEnableOperationSync();
- private static SessionPool operationSyncsessionPool;
+ private static SessionPool operationSyncSessionPool;
private static OperationSyncProducer operationSyncProducer;
private static OperationSyncDDLProtector operationSyncDDLProtector;
private static OperationSyncLogService operationSyncDDLLogService;
+ private static AtomicBoolean isSecondaryAlive = new AtomicBoolean(false);
/**
* Time range for dividing storage group, the time unit is the same with IoTDB's
@@ -156,55 +166,81 @@ public class StorageEngine implements IService {
// add customized listeners here for flush and close events
private List<CloseFileListener> customCloseFileListeners = new ArrayList<>();
private List<FlushListener> customFlushListeners = new ArrayList<>();
+ ArrayList<BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>>
+ arrayListBlockQueue;
private StorageEngine() {
if (isEnableOperationSync) {
- /* Open OperationSync */
+ // Open OperationSync
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ int cacheNum = config.getOperationSyncProducerCacheNum();
+
// create SessionPool for OperationSync
- operationSyncsessionPool =
+ operationSyncSessionPool =
new SessionPool(
config.getSecondaryAddress(),
config.getSecondaryPort(),
config.getSecondaryUser(),
config.getSecondaryPassword(),
- 5,
+ config.getSecondarySessionPoolMaxSize(),
config.isRpcThriftCompressionEnable());
+ ThreadPoolExecutor threadPool =
+ new ThreadPoolExecutor(
+ cacheNum + 5,
+ cacheNum + 5,
+ 3,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(5),
+ new ThreadPoolExecutor.AbortPolicy());
// create operationSyncDDLProtector and operationSyncDDLLogService
- operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncsessionPool);
- new Thread(operationSyncDDLProtector).start();
+ ScheduledExecutorService secondaryCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("secondary-Check");
+
+ secondaryCheckThread.scheduleAtFixedRate(
+ this::checkSecondaryIsLife, 0L, HEARTBEAT_CHECK_INTERVAL, TimeUnit.SECONDS);
+
+ operationSyncDDLProtector = new OperationSyncDDLProtector(operationSyncSessionPool);
+ threadPool.execute(operationSyncDDLProtector);
operationSyncDDLLogService =
new OperationSyncLogService("OperationSyncDDLLog", operationSyncDDLProtector);
- new Thread(operationSyncDDLLogService).start();
+ threadPool.execute(operationSyncDDLLogService);
// create OperationSyncProducer
- BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
- blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
- operationSyncProducer = new OperationSyncProducer(blockingQueue);
-
- // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ arrayListBlockQueue = new ArrayList<>(cacheNum);
+ for (int i = 0; i < cacheNum; i++) {
+ BlockingQueue<Pair<ByteBuffer, OperationSyncPlanTypeUtils.OperationSyncPlanType>>
+ blockingQueue = new ArrayBlockingQueue<>(config.getOperationSyncProducerCacheSize());
+ arrayListBlockQueue.add(blockingQueue);
+ }
OperationSyncDMLProtector operationSyncDMLProtector =
- new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncProducer);
- new Thread(operationSyncDMLProtector).start();
+ new OperationSyncDMLProtector(operationSyncDDLProtector, operationSyncSessionPool);
+ threadPool.execute(operationSyncDMLProtector);
OperationSyncLogService operationSyncDMLLogService =
new OperationSyncLogService("OperationSyncDMLLog", operationSyncDMLProtector);
- new Thread(operationSyncDMLLogService).start();
-
+ operationSyncProducer =
+ new OperationSyncProducer(arrayListBlockQueue, operationSyncDMLLogService);
+ // create OperationSyncDMLProtector and OperationSyncDMLLogService
+ threadPool.execute(operationSyncDMLLogService);
// create OperationSyncConsumer
- for (int i = 0; i < config.getOperationSyncConsumerConcurrencySize(); i++) {
- OperationSyncConsumer consumer =
+ for (int i = 0; i < cacheNum; i++) {
+ threadPool.execute(
new OperationSyncConsumer(
- blockingQueue, operationSyncsessionPool, operationSyncDMLLogService);
- new Thread(consumer).start();
+ arrayListBlockQueue.get(i), operationSyncSessionPool, operationSyncDMLLogService));
+ }
+ if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
+ ScheduledExecutorService secondaryMetricThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("secondary-Metric");
+ secondaryMetricThread.scheduleAtFixedRate(
+ this::secondaryMetric, 0L, SECONDARY_METRIC_INTERVAL, TimeUnit.SECONDS);
}
-
logger.info("Successfully initialize OperationSync!");
} else {
- operationSyncsessionPool = null;
+ operationSyncSessionPool = null;
operationSyncProducer = null;
operationSyncDDLProtector = null;
operationSyncDDLLogService = null;
+ arrayListBlockQueue = null;
}
}
@@ -246,14 +282,14 @@ public class StorageEngine implements IService {
OperationSyncWriteTask ddlTask =
new OperationSyncWriteTask(
buffer,
- operationSyncsessionPool,
+ operationSyncSessionPool,
operationSyncDDLProtector,
operationSyncDDLLogService);
ddlTask.run();
break;
case DMLPlan:
// Put into OperationSyncProducer
- operationSyncProducer.put(new Pair<>(buffer, planType));
+ operationSyncProducer.put(new Pair<>(buffer, planType), getDeviceNameByPlan(physicalPlan));
}
}
@@ -273,6 +309,25 @@ public class StorageEngine implements IService {
return result;
}
+ public static String getDeviceNameByPlan(PhysicalPlan plan) {
+ if (plan instanceof InsertPlan) {
+ InsertPlan physicalPlan = (InsertPlan) plan;
+ if (physicalPlan.getDevicePath() == null) {
+ if (physicalPlan.getPaths() != null && physicalPlan.getPaths().size() > 0) {
+ return physicalPlan.getPaths().get(0).getDevice();
+ } else {
+ return null;
+ }
+ }
+ return physicalPlan.getDevicePath().getDevice();
+ }
+ return null;
+ }
+
+ public static AtomicBoolean isSecondaryAlive() {
+ return isSecondaryAlive;
+ }
+
public static long getTimePartitionInterval() {
if (timePartitionInterval == -1) {
initTimePartition();
@@ -393,6 +448,29 @@ public class StorageEngine implements IService {
startTimedService();
}
+ private void checkSecondaryIsLife() {
+ try {
+ isSecondaryAlive.set(operationSyncSessionPool.getSystemStatus() == SystemStatus.NORMAL);
+ } catch (IoTDBConnectionException e) {
+ isSecondaryAlive.set(false);
+ }
+ }
+
+ private void secondaryMetric() {
+ for (int i = 0; i < arrayListBlockQueue.size(); i++) {
+ MetricsService.getInstance()
+ .getMetricManager()
+ .getOrCreateGauge(
+ Metric.QUEUE.toString(),
+ MetricLevel.IMPORTANT,
+ Tag.NAME.toString(),
+ "OperationSyncProducerQueueSize" + i,
+ Tag.STATUS.toString(),
+ "running")
+ .set(arrayListBlockQueue.get(i).size());
+ }
+ }
+
private void checkTTL() {
try {
IoTDB.metaManager.checkTTLOnLastCache();
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
index 79ad566f59..9663da20fc 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
@@ -140,6 +142,21 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
buffer.putLong(firstExecutionTimeBoundary);
}
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.write((byte) PhysicalPlanType.CREATE_CONTINUOUS_QUERY.ordinal());
+ ReadWriteIOUtils.write(continuousQueryName, stream);
+ ReadWriteIOUtils.write(querySql, stream);
+ ReadWriteIOUtils.write(querySqlBeforeGroupByClause, stream);
+ ReadWriteIOUtils.write(querySqlAfterGroupByClause, stream);
+ ReadWriteIOUtils.write(targetPath.getFullPath(), stream);
+ stream.writeLong(everyInterval);
+ stream.writeLong(forInterval);
+ stream.writeLong(groupByTimeInterval);
+ ReadWriteIOUtils.write(groupByTimeIntervalString, stream);
+ stream.writeLong(firstExecutionTimeBoundary);
+ }
+
@Override
public void deserialize(ByteBuffer buffer) throws IllegalPathException {
continuousQueryName = ReadWriteIOUtils.readString(buffer);
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
index 463b03c408..5d273c2e33 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/DropContinuousQueryPlan.java
@@ -24,6 +24,8 @@ import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -60,4 +62,10 @@ public class DropContinuousQueryPlan extends PhysicalPlan {
public void deserialize(ByteBuffer buffer) {
continuousQueryName = ReadWriteIOUtils.readString(buffer);
}
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.write((byte) PhysicalPlanType.DROP_CONTINUOUS_QUERY.ordinal());
+ ReadWriteIOUtils.write(continuousQueryName, stream);
+ }
}