You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 03:52:44 UTC
[iotdb] 02/03: add tryExecuteCQTaskOnceBeforeRegistration
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-2277
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a1ecdac76c208f32e79b59b60c40c4a3f689cd16
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 11:43:51 2022 +0800
add tryExecuteCQTaskOnceBeforeRegistration
---
.../apache/iotdb/db/cq/ContinuousQueryService.java | 55 +++++++++++++---------
.../db/exception/ContinuousQueryException.java | 6 +++
.../iotdb/db/qp/strategy/LogicalChecker.java | 5 ++
3 files changed, 45 insertions(+), 21 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index 0a6ac1f..8f3e758 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -48,7 +48,7 @@ public class ContinuousQueryService implements IService {
ContinuousQueryTaskPoolManager.getInstance();
private static final long TASK_SUBMIT_CHECK_INTERVAL =
IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
- private ScheduledExecutorService cqTasksSubmitThread;
+ private ScheduledExecutorService continuousQueryTaskSubmitThread;
private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
new ConcurrentHashMap<>();
@@ -71,9 +71,9 @@ public class ContinuousQueryService implements IService {
nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
}
- cqTasksSubmitThread =
+ continuousQueryTaskSubmitThread =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
- cqTasksSubmitThread.scheduleAtFixedRate(
+ continuousQueryTaskSubmitThread.scheduleAtFixedRate(
this::checkAndSubmitTasks,
0,
TASK_SUBMIT_CHECK_INTERVAL,
@@ -85,7 +85,6 @@ public class ContinuousQueryService implements IService {
private void checkAndSubmitTasks() {
long currentTimestamp = DatetimeUtils.currentTime();
-
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
while (currentTimestamp >= nextExecutionTimestamp) {
@@ -98,13 +97,13 @@ public class ContinuousQueryService implements IService {
@Override
public void stop() {
- if (cqTasksSubmitThread != null) {
- cqTasksSubmitThread.shutdown();
+ if (continuousQueryTaskSubmitThread != null) {
+ continuousQueryTaskSubmitThread.shutdown();
try {
- cqTasksSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+ continuousQueryTaskSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("Check thread still doesn't exit after 60s");
- cqTasksSubmitThread.shutdownNow();
+ continuousQueryTaskSubmitThread.shutdownNow();
Thread.currentThread().interrupt();
}
}
@@ -122,19 +121,21 @@ public class ContinuousQueryService implements IService {
public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
throws ContinuousQueryException {
+ if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+ throw new ContinuousQueryException(
+ String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
+ }
+
+ // some exceptions will only occur at runtime
+ tryExecuteCQTaskOnceBeforeRegistration(plan);
+
acquireRegistrationLock();
try {
- if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
- throw new ContinuousQueryException(
- String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
- }
if (shouldWriteLog) {
IoTDB.metaManager.createContinuousQuery(plan);
}
doRegister(plan);
return true;
- } catch (ContinuousQueryException e) {
- throw e;
} catch (Exception e) {
throw new ContinuousQueryException(e.getMessage());
} finally {
@@ -142,9 +143,22 @@ public class ContinuousQueryService implements IService {
}
}
+ private void tryExecuteCQTaskOnceBeforeRegistration(CreateContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ try {
+ new ContinuousQueryTask(plan, plan.getCreationTimestamp()).run();
+ } catch (Exception e) {
+ throw new ContinuousQueryException("Failed to create continuous query task.", e);
+ }
+ }
+
private void doRegister(CreateContinuousQueryPlan plan) {
continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
- nextExecutionTimestamps.put(plan.getContinuousQueryName(), plan.getCreationTimestamp());
+ // one cq task has been executed in tryExecuteCQTaskOnceBeforeRegistration
+ // so nextExecutionTimestamp should start with
+ // plan.getCreationTimestamp() + plan.getEveryInterval()
+ nextExecutionTimestamps.put(
+ plan.getContinuousQueryName(), plan.getCreationTimestamp() + plan.getEveryInterval());
}
public void deregisterAll() throws ContinuousQueryException {
@@ -154,17 +168,16 @@ public class ContinuousQueryService implements IService {
}
public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
+ if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+ throw new ContinuousQueryException(
+ String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
+ }
+
acquireRegistrationLock();
try {
- if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
- throw new ContinuousQueryException(
- String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
- }
IoTDB.metaManager.dropContinuousQuery(plan);
doDeregister(plan);
return true;
- } catch (ContinuousQueryException e) {
- throw e;
} catch (Exception e) {
throw new ContinuousQueryException(e.getMessage());
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
index 53108bb..e846e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
@@ -27,4 +27,10 @@ public class ContinuousQueryException extends StorageEngineException {
super(message, TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode());
this.isUserException = true;
}
+
+ public ContinuousQueryException(String message, Exception e) {
+ super(message, e);
+ this.errorCode = TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode();
+ this.isUserException = true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index 14234d5..3013966 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectIntoOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
public class LogicalChecker {
@@ -37,5 +38,9 @@ public class LogicalChecker {
if (operator instanceof SelectIntoOperator) {
((SelectIntoOperator) operator).check();
}
+
+ if (operator instanceof CreateContinuousQueryOperator) {
+ ((CreateContinuousQueryOperator) operator).getQueryOperator().check();
+ }
}
}