You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/14 12:03:37 UTC
[iotdb] branch iotdb-2353-cq updated: support show and fix recover bug
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-2353-cq
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-2353-cq by this push:
new befa4df support show and fix recover bug
befa4df is described below
commit befa4df08f53e102e075ca01059590a167c4f1df
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Jan 14 20:01:45 2022 +0800
support show and fix recover bug
---
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
.../apache/iotdb/db/cq/ContinuousQueryService.java | 8 ++++++--
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 4 ++++
.../query/dataset/ShowContinuousQueriesResult.java | 21 ++++++---------------
.../java/org/apache/iotdb/db/service/IoTDB.java | 4 ++--
5 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 6013210..c882b39 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -103,6 +103,7 @@ public class IoTDBConstant {
public static final String COLUMN_CONTINUOUS_QUERY_NAME = "cq name";
public static final String COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL = "every interval";
public static final String COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL = "for interval";
+ public static final String COLUMN_CONTINUOUS_QUERY_BOUNDARY = "boundary";
public static final String COLUMN_CONTINUOUS_QUERY_TARGET_PATH = "target path";
public static final String COLUMN_CONTINUOUS_QUERY_QUERY_SQL = "query sql";
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index c92bf8f..58e11f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -142,7 +142,10 @@ public class ContinuousQueryService implements IService {
String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
}
- checkSchemaBeforeRegistration(plan);
+ // if it is not processing recovery
+ if (shouldWriteLog) {
+ checkSchemaBeforeRegistration(plan);
+ }
acquireRegistrationLock();
try {
@@ -216,7 +219,8 @@ public class ContinuousQueryService implements IService {
plan.getContinuousQueryName(),
plan.getTargetPath(),
plan.getEveryInterval(),
- plan.getForInterval()));
+ plan.getForInterval(),
+ plan.getFirstExecutionTimeBoundary()));
}
return results;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 83e3e1d..0966e6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -189,6 +189,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_BOUNDARY;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
@@ -1035,12 +1036,14 @@ public class PlanExecutor implements IPlanExecutor {
new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+ new PartialPath(COLUMN_CONTINUOUS_QUERY_BOUNDARY, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
Arrays.asList(
TSDataType.TEXT,
TSDataType.INT64,
TSDataType.INT64,
+ TSDataType.INT64,
TSDataType.TEXT,
TSDataType.TEXT));
@@ -1052,6 +1055,7 @@ public class PlanExecutor implements IPlanExecutor {
record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
record.addField(result.getEveryInterval(), TSDataType.INT64);
record.addField(result.getForInterval(), TSDataType.INT64);
+ record.addField(result.getBoundary(), TSDataType.INT64);
record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
listDataSet.putRecord(record);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
index afc96cf..5be2c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
@@ -28,36 +28,31 @@ public class ShowContinuousQueriesResult extends ShowResult {
private PartialPath targetPath;
private long everyInterval;
private long forInterval;
+ private long boundary;
public ShowContinuousQueriesResult(
String querySql,
String continuousQueryName,
PartialPath targetPath,
long everyInterval,
- long forInterval) {
+ long forInterval,
+ long boundary) {
this.querySql = querySql;
this.continuousQueryName = continuousQueryName;
this.targetPath = targetPath;
this.everyInterval = everyInterval;
this.forInterval = forInterval;
+ this.boundary = boundary;
}
public String getQuerySql() {
return querySql;
}
- public void setQuerySql(String querySql) {
- this.querySql = querySql;
- }
-
public String getContinuousQueryName() {
return continuousQueryName;
}
- public void setContinuousQueryName(String continuousQueryName) {
- this.continuousQueryName = continuousQueryName;
- }
-
public PartialPath getTargetPath() {
return targetPath;
}
@@ -70,15 +65,11 @@ public class ShowContinuousQueriesResult extends ShowResult {
return everyInterval;
}
- public void setEveryInterval(long everyInterval) {
- this.everyInterval = everyInterval;
- }
-
public long getForInterval() {
return forInterval;
}
- public void setForInterval(long forInterval) {
- this.forInterval = forInterval;
+ public long getBoundary() {
+ return boundary;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 4f754ea..4c3468a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -137,8 +137,6 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(TemporaryQueryDataFileService.getInstance());
registerManager.register(UDFClassLoaderManager.getInstance());
registerManager.register(UDFRegistrationService.getInstance());
- registerManager.register(TriggerRegistrationService.getInstance());
- registerManager.register(ContinuousQueryService.getInstance());
// in cluster mode, RPC service is not enabled.
if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
@@ -170,6 +168,8 @@ public class IoTDB implements IoTDBMBean {
registerManager.register(SyncServerManager.getInstance());
registerManager.register(UpgradeSevice.getINSTANCE());
registerManager.register(SettleService.getINSTANCE());
+ registerManager.register(TriggerRegistrationService.getInstance());
+ registerManager.register(ContinuousQueryService.getInstance());
logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
}