You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 03:52:44 UTC

[iotdb] 02/03: add tryExecuteCQTaskOnceBeforeRegistration

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2277
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit a1ecdac76c208f32e79b59b60c40c4a3f689cd16
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 11:43:51 2022 +0800

    add tryExecuteCQTaskOnceBeforeRegistration
---
 .../apache/iotdb/db/cq/ContinuousQueryService.java | 55 +++++++++++++---------
 .../db/exception/ContinuousQueryException.java     |  6 +++
 .../iotdb/db/qp/strategy/LogicalChecker.java       |  5 ++
 3 files changed, 45 insertions(+), 21 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index 0a6ac1f..8f3e758 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -48,7 +48,7 @@ public class ContinuousQueryService implements IService {
       ContinuousQueryTaskPoolManager.getInstance();
   private static final long TASK_SUBMIT_CHECK_INTERVAL =
       IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
-  private ScheduledExecutorService cqTasksSubmitThread;
+  private ScheduledExecutorService continuousQueryTaskSubmitThread;
 
   private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
       new ConcurrentHashMap<>();
@@ -71,9 +71,9 @@ public class ContinuousQueryService implements IService {
       nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
     }
 
-    cqTasksSubmitThread =
+    continuousQueryTaskSubmitThread =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
-    cqTasksSubmitThread.scheduleAtFixedRate(
+    continuousQueryTaskSubmitThread.scheduleAtFixedRate(
         this::checkAndSubmitTasks,
         0,
         TASK_SUBMIT_CHECK_INTERVAL,
@@ -85,7 +85,6 @@ public class ContinuousQueryService implements IService {
 
   private void checkAndSubmitTasks() {
     long currentTimestamp = DatetimeUtils.currentTime();
-
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
       long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
       while (currentTimestamp >= nextExecutionTimestamp) {
@@ -98,13 +97,13 @@ public class ContinuousQueryService implements IService {
 
   @Override
   public void stop() {
-    if (cqTasksSubmitThread != null) {
-      cqTasksSubmitThread.shutdown();
+    if (continuousQueryTaskSubmitThread != null) {
+      continuousQueryTaskSubmitThread.shutdown();
       try {
-        cqTasksSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+        continuousQueryTaskSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         LOGGER.warn("Check thread still doesn't exit after 60s");
-        cqTasksSubmitThread.shutdownNow();
+        continuousQueryTaskSubmitThread.shutdownNow();
         Thread.currentThread().interrupt();
       }
     }
@@ -122,19 +121,21 @@ public class ContinuousQueryService implements IService {
 
   public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
       throws ContinuousQueryException {
+    if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+      throw new ContinuousQueryException(
+          String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
+    }
+
+    // some exceptions will only occur at runtime
+    tryExecuteCQTaskOnceBeforeRegistration(plan);
+
     acquireRegistrationLock();
     try {
-      if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
-        throw new ContinuousQueryException(
-            String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
-      }
       if (shouldWriteLog) {
         IoTDB.metaManager.createContinuousQuery(plan);
       }
       doRegister(plan);
       return true;
-    } catch (ContinuousQueryException e) {
-      throw e;
     } catch (Exception e) {
       throw new ContinuousQueryException(e.getMessage());
     } finally {
@@ -142,9 +143,22 @@ public class ContinuousQueryService implements IService {
     }
   }
 
+  private void tryExecuteCQTaskOnceBeforeRegistration(CreateContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    try {
+      new ContinuousQueryTask(plan, plan.getCreationTimestamp()).run();
+    } catch (Exception e) {
+      throw new ContinuousQueryException("Failed to create continuous query task.", e);
+    }
+  }
+
   private void doRegister(CreateContinuousQueryPlan plan) {
     continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
-    nextExecutionTimestamps.put(plan.getContinuousQueryName(), plan.getCreationTimestamp());
+    // one cq task has been executed in tryExecuteCQTaskOnceBeforeRegistration
+    // so nextExecutionTimestamp should start with
+    //     plan.getCreationTimestamp() + plan.getEveryInterval()
+    nextExecutionTimestamps.put(
+        plan.getContinuousQueryName(), plan.getCreationTimestamp() + plan.getEveryInterval());
   }
 
   public void deregisterAll() throws ContinuousQueryException {
@@ -154,17 +168,16 @@ public class ContinuousQueryService implements IService {
   }
 
   public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
+    if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+      throw new ContinuousQueryException(
+          String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
+    }
+
     acquireRegistrationLock();
     try {
-      if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
-        throw new ContinuousQueryException(
-            String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
-      }
       IoTDB.metaManager.dropContinuousQuery(plan);
       doDeregister(plan);
       return true;
-    } catch (ContinuousQueryException e) {
-      throw e;
     } catch (Exception e) {
       throw new ContinuousQueryException(e.getMessage());
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
index 53108bb..e846e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
@@ -27,4 +27,10 @@ public class ContinuousQueryException extends StorageEngineException {
     super(message, TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode());
     this.isUserException = true;
   }
+
+  public ContinuousQueryException(String message, Exception e) {
+    super(message, e);
+    this.errorCode = TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode();
+    this.isUserException = true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index 14234d5..3013966 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectIntoOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
 
 public class LogicalChecker {
 
@@ -37,5 +38,9 @@ public class LogicalChecker {
     if (operator instanceof SelectIntoOperator) {
       ((SelectIntoOperator) operator).check();
     }
+
+    if (operator instanceof CreateContinuousQueryOperator) {
+      ((CreateContinuousQueryOperator) operator).getQueryOperator().check();
+    }
   }
 }