You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/14 12:03:37 UTC

[iotdb] branch iotdb-2353-cq updated: support show and fix recover bug

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2353-cq
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-2353-cq by this push:
     new befa4df  support show and fix recover bug
befa4df is described below

commit befa4df08f53e102e075ca01059590a167c4f1df
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Jan 14 20:01:45 2022 +0800

    support show and fix recover bug
---
 .../org/apache/iotdb/db/conf/IoTDBConstant.java     |  1 +
 .../apache/iotdb/db/cq/ContinuousQueryService.java  |  8 ++++++--
 .../apache/iotdb/db/qp/executor/PlanExecutor.java   |  4 ++++
 .../query/dataset/ShowContinuousQueriesResult.java  | 21 ++++++---------------
 .../java/org/apache/iotdb/db/service/IoTDB.java     |  4 ++--
 5 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index 6013210..c882b39 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -103,6 +103,7 @@ public class IoTDBConstant {
   public static final String COLUMN_CONTINUOUS_QUERY_NAME = "cq name";
   public static final String COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL = "every interval";
   public static final String COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL = "for interval";
+  public static final String COLUMN_CONTINUOUS_QUERY_BOUNDARY = "boundary";
   public static final String COLUMN_CONTINUOUS_QUERY_TARGET_PATH = "target path";
   public static final String COLUMN_CONTINUOUS_QUERY_QUERY_SQL = "query sql";
 
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index c92bf8f..58e11f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -142,7 +142,10 @@ public class ContinuousQueryService implements IService {
           String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
     }
 
-    checkSchemaBeforeRegistration(plan);
+    // if it is not processing recovery
+    if (shouldWriteLog) {
+      checkSchemaBeforeRegistration(plan);
+    }
 
     acquireRegistrationLock();
     try {
@@ -216,7 +219,8 @@ public class ContinuousQueryService implements IService {
               plan.getContinuousQueryName(),
               plan.getTargetPath(),
               plan.getEveryInterval(),
-              plan.getForInterval()));
+              plan.getForInterval(),
+              plan.getFirstExecutionTimeBoundary()));
     }
     return results;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 83e3e1d..0966e6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -189,6 +189,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_BOUNDARY;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
@@ -1035,12 +1036,14 @@ public class PlanExecutor implements IPlanExecutor {
                 new PartialPath(COLUMN_CONTINUOUS_QUERY_NAME, false),
                 new PartialPath(COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL, false),
                 new PartialPath(COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL, false),
+                new PartialPath(COLUMN_CONTINUOUS_QUERY_BOUNDARY, false),
                 new PartialPath(COLUMN_CONTINUOUS_QUERY_QUERY_SQL, false),
                 new PartialPath(COLUMN_CONTINUOUS_QUERY_TARGET_PATH, false)),
             Arrays.asList(
                 TSDataType.TEXT,
                 TSDataType.INT64,
                 TSDataType.INT64,
+                TSDataType.INT64,
                 TSDataType.TEXT,
                 TSDataType.TEXT));
 
@@ -1052,6 +1055,7 @@ public class PlanExecutor implements IPlanExecutor {
       record.addField(Binary.valueOf(result.getContinuousQueryName()), TSDataType.TEXT);
       record.addField(result.getEveryInterval(), TSDataType.INT64);
       record.addField(result.getForInterval(), TSDataType.INT64);
+      record.addField(result.getBoundary(), TSDataType.INT64);
       record.addField(Binary.valueOf(result.getQuerySql()), TSDataType.TEXT);
       record.addField(Binary.valueOf(result.getTargetPath().getFullPath()), TSDataType.TEXT);
       listDataSet.putRecord(record);
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
index afc96cf..5be2c61 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/ShowContinuousQueriesResult.java
@@ -28,36 +28,31 @@ public class ShowContinuousQueriesResult extends ShowResult {
   private PartialPath targetPath;
   private long everyInterval;
   private long forInterval;
+  private long boundary;
 
   public ShowContinuousQueriesResult(
       String querySql,
       String continuousQueryName,
       PartialPath targetPath,
       long everyInterval,
-      long forInterval) {
+      long forInterval,
+      long boundary) {
     this.querySql = querySql;
     this.continuousQueryName = continuousQueryName;
     this.targetPath = targetPath;
     this.everyInterval = everyInterval;
     this.forInterval = forInterval;
+    this.boundary = boundary;
   }
 
   public String getQuerySql() {
     return querySql;
   }
 
-  public void setQuerySql(String querySql) {
-    this.querySql = querySql;
-  }
-
   public String getContinuousQueryName() {
     return continuousQueryName;
   }
 
-  public void setContinuousQueryName(String continuousQueryName) {
-    this.continuousQueryName = continuousQueryName;
-  }
-
   public PartialPath getTargetPath() {
     return targetPath;
   }
@@ -70,15 +65,11 @@ public class ShowContinuousQueriesResult extends ShowResult {
     return everyInterval;
   }
 
-  public void setEveryInterval(long everyInterval) {
-    this.everyInterval = everyInterval;
-  }
-
   public long getForInterval() {
     return forInterval;
   }
 
-  public void setForInterval(long forInterval) {
-    this.forInterval = forInterval;
+  public long getBoundary() {
+    return boundary;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 4f754ea..4c3468a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -137,8 +137,6 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(TemporaryQueryDataFileService.getInstance());
     registerManager.register(UDFClassLoaderManager.getInstance());
     registerManager.register(UDFRegistrationService.getInstance());
-    registerManager.register(TriggerRegistrationService.getInstance());
-    registerManager.register(ContinuousQueryService.getInstance());
 
     // in cluster mode, RPC service is not enabled.
     if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
@@ -170,6 +168,8 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(SyncServerManager.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
     registerManager.register(SettleService.getINSTANCE());
+    registerManager.register(TriggerRegistrationService.getInstance());
+    registerManager.register(ContinuousQueryService.getInstance());
 
     logger.info("Congratulation, IoTDB is set up successfully. Now, enjoy yourself!");
   }