You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 05:22:15 UTC
[iotdb] branch master updated: [IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9baeae3 [IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)
9baeae3 is described below
commit 9baeae3f5cd7b5037607e7e49cbc0a763ff17cb3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 13:21:38 2022 +0800
[IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)
---
.../db/integration/IoTDBContinuousQueryIT.java | 8 +-
.../apache/iotdb/db/cq/ContinuousQueryService.java | 147 ++++++++++-----------
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 73 +++++-----
.../db/cq/ContinuousQueryTaskPoolManager.java | 15 ++-
.../db/exception/ContinuousQueryException.java | 6 +
.../iotdb/db/qp/strategy/LogicalChecker.java | 5 +
6 files changed, 132 insertions(+), 122 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index cbfff06..dbf91d7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -127,12 +127,12 @@ public class IoTDBContinuousQueryIT {
+ "GROUP BY time(1s) END");
statement.execute(
"CREATE CONTINUOUS QUERY cq2 "
- + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+ + "BEGIN SELECT avg(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+ " GROUP BY time(1s), level=3 END");
statement.execute(
"CREATE CONTINUOUS QUERY cq3 "
+ "RESAMPLE EVERY 2s FOR 2s "
- + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+ + "BEGIN SELECT min_value(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+ "GROUP BY time(1s), level=2 END");
statement.execute("DROP CONTINUOUS QUERY cq1");
@@ -166,11 +166,11 @@ public class IoTDBContinuousQueryIT {
statement.execute(
"CREATE CONTINUOUS QUERY cq1 "
- + "BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* "
+ + "BEGIN SELECT sum(temperature) INTO temperature_max FROM root.ln.*.*.* "
+ "GROUP BY time(1s) END");
statement.execute(
"CREATE CONTINUOUS QUERY cq2 "
- + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+ + "BEGIN SELECT avg(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+ " GROUP BY time(1s), level=3 END");
checkShowContinuousQueriesResult(new String[] {"cq3", "cq1", "cq2"});
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index fb7a1b2..8f3e758 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.cq;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ContinuousQueryException;
-import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
@@ -43,39 +42,18 @@ import java.util.concurrent.locks.ReentrantLock;
public class ContinuousQueryService implements IService {
- private static long CHECK_INTERVAL =
- IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
- private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryService.class);
+ private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
+ ContinuousQueryTaskPoolManager.getInstance();
+ private static final long TASK_SUBMIT_CHECK_INTERVAL =
+ IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+ private ScheduledExecutorService continuousQueryTaskSubmitThread;
private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
new ConcurrentHashMap<>();
-
private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
- private final ReentrantLock registrationLock = new ReentrantLock();
-
- private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
-
- private ScheduledExecutorService checkThread;
-
- protected static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
- ContinuousQueryTaskPoolManager.getInstance();
-
- private ContinuousQueryService() {}
-
- public static ContinuousQueryService getInstance() {
- return INSTANCE;
- }
-
- public void acquireRegistrationLock() {
- registrationLock.lock();
- }
-
- public void releaseRegistrationLock() {
- registrationLock.unlock();
- }
-
@Override
public ServiceType getID() {
return ServiceType.CONTINUOUS_QUERY_SERVICE;
@@ -83,7 +61,6 @@ public class ContinuousQueryService implements IService {
@Override
public void start() {
-
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
long nextExecutionTimestamp =
@@ -94,76 +71,94 @@ public class ContinuousQueryService implements IService {
nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
}
- checkThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Check");
- checkThread.scheduleAtFixedRate(
+ continuousQueryTaskSubmitThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
+ continuousQueryTaskSubmitThread.scheduleAtFixedRate(
this::checkAndSubmitTasks,
0,
- CHECK_INTERVAL,
+ TASK_SUBMIT_CHECK_INTERVAL,
DatetimeUtils.timestampPrecisionStringToTimeUnit(
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
- logger.info("Continuous query service started.");
+ LOGGER.info("Continuous query service started.");
+ }
+
+ private void checkAndSubmitTasks() {
+ long currentTimestamp = DatetimeUtils.currentTime();
+ for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
+ long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
+ while (currentTimestamp >= nextExecutionTimestamp) {
+ TASK_POOL_MANAGER.submit(new ContinuousQueryTask(plan, nextExecutionTimestamp));
+ nextExecutionTimestamp += plan.getEveryInterval();
+ }
+ nextExecutionTimestamps.replace(plan.getContinuousQueryName(), nextExecutionTimestamp);
+ }
}
@Override
public void stop() {
- if (checkThread != null) {
- checkThread.shutdown();
+ if (continuousQueryTaskSubmitThread != null) {
+ continuousQueryTaskSubmitThread.shutdown();
try {
- checkThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+ continuousQueryTaskSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- logger.warn("Check thread still doesn't exit after 60s");
- checkThread.shutdownNow();
+ LOGGER.warn("Check thread still doesn't exit after 60s");
+ continuousQueryTaskSubmitThread.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
- @Override
- public void shutdown(long milliseconds) throws ShutdownException {
- stop();
- }
+ private final ReentrantLock registrationLock = new ReentrantLock();
- private void checkAndSubmitTasks() {
- long currentTimestamp = DatetimeUtils.currentTime();
+ public void acquireRegistrationLock() {
+ registrationLock.lock();
+ }
- for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
- long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
- while (currentTimestamp >= nextExecutionTimestamp) {
- TASK_POOL_MANAGER.submit(new ContinuousQueryTask(plan, nextExecutionTimestamp));
- nextExecutionTimestamp += plan.getEveryInterval();
- }
- nextExecutionTimestamps.replace(plan.getContinuousQueryName(), nextExecutionTimestamp);
- }
+ public void releaseRegistrationLock() {
+ registrationLock.unlock();
}
- public boolean register(CreateContinuousQueryPlan plan, boolean writeLog)
+ public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
throws ContinuousQueryException {
+ if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+ throw new ContinuousQueryException(
+ String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
+ }
- acquireRegistrationLock();
+ // some exceptions will only occur at runtime
+ tryExecuteCQTaskOnceBeforeRegistration(plan);
+ acquireRegistrationLock();
try {
- if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
- throw new ContinuousQueryException(
- String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
- }
- if (writeLog) {
+ if (shouldWriteLog) {
IoTDB.metaManager.createContinuousQuery(plan);
}
doRegister(plan);
- } catch (ContinuousQueryException e) {
- throw e;
+ return true;
} catch (Exception e) {
throw new ContinuousQueryException(e.getMessage());
} finally {
releaseRegistrationLock();
}
- return true;
+ }
+
+ private void tryExecuteCQTaskOnceBeforeRegistration(CreateContinuousQueryPlan plan)
+ throws ContinuousQueryException {
+ try {
+ new ContinuousQueryTask(plan, plan.getCreationTimestamp()).run();
+ } catch (Exception e) {
+ throw new ContinuousQueryException("Failed to create continuous query task.", e);
+ }
}
private void doRegister(CreateContinuousQueryPlan plan) {
continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
- nextExecutionTimestamps.put(plan.getContinuousQueryName(), plan.getCreationTimestamp());
+ // one cq task has been executed in tryExecuteCQTaskOnceBeforeRegistration
+ // so nextExecutionTimestamp should start with
+ // plan.getCreationTimestamp() + plan.getEveryInterval()
+ nextExecutionTimestamps.put(
+ plan.getContinuousQueryName(), plan.getCreationTimestamp() + plan.getEveryInterval());
}
public void deregisterAll() throws ContinuousQueryException {
@@ -173,25 +168,21 @@ public class ContinuousQueryService implements IService {
}
public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
+ if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+ throw new ContinuousQueryException(
+ String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
+ }
acquireRegistrationLock();
-
try {
- if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
- throw new ContinuousQueryException(
- String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
- }
IoTDB.metaManager.dropContinuousQuery(plan);
doDeregister(plan);
- } catch (ContinuousQueryException e) {
- throw e;
+ return true;
} catch (Exception e) {
throw new ContinuousQueryException(e.getMessage());
} finally {
releaseRegistrationLock();
}
-
- return true;
}
private void doDeregister(DropContinuousQueryPlan plan) {
@@ -200,11 +191,8 @@ public class ContinuousQueryService implements IService {
}
public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
-
List<ShowContinuousQueriesResult> results = new ArrayList<>(continuousQueryPlans.size());
-
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-
results.add(
new ShowContinuousQueriesResult(
plan.getQuerySql(),
@@ -213,7 +201,14 @@ public class ContinuousQueryService implements IService {
plan.getEveryInterval(),
plan.getForInterval()));
}
-
return results;
}
+
+ private ContinuousQueryService() {}
+
+ private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
+
+ public static ContinuousQueryService getInstance() {
+ return INSTANCE;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284..48d2aa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.cq;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,10 +43,12 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -54,30 +57,26 @@ import java.util.regex.Pattern;
public class ContinuousQueryTask extends WrappedRunnable {
- private static final int FETCH_SIZE = 10000;
- private static final int BATCH_SIZE = 10000;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
- private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryTask.class);
+ private static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
+ private static final int EXECUTION_BATCH_SIZE = 10000;
- // To execute the query plan
- private static PlanExecutor planExecutor;
+ // TODO: support CQ in cluster mode
+ private static BasicServiceProvider serviceProvider;
static {
try {
- planExecutor = new PlanExecutor();
+ serviceProvider = new BasicServiceProvider();
} catch (QueryProcessException e) {
- logger.error(e.getMessage());
+ LOGGER.error(e.getMessage());
}
}
// To save the continuous query info
private final CreateContinuousQueryPlan plan;
- // To transform query operator to query plan
- private static final Planner planner = new Planner();
// Next timestamp to execute a query
- private long windowEndTimestamp;
-
- private static final Pattern pattern = Pattern.compile("\\$\\{\\w+}");
+ private final long windowEndTimestamp;
public ContinuousQueryTask(CreateContinuousQueryPlan plan, long windowEndTimestamp) {
this.plan = plan;
@@ -87,31 +86,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
@Override
public void runMayThrow()
throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
- QueryFilterOptimizationException, MetadataException {
-
+ QueryFilterOptimizationException, MetadataException, TException, SQLException {
GroupByTimePlan queryPlan = generateQueryPlan();
-
if (queryPlan.getDeduplicatedPaths().isEmpty()) {
- logger.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
+ LOGGER.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
return;
}
QueryDataSet result = doQuery(queryPlan);
-
if (result == null || result.getPaths().size() == 0) {
- logger.info(plan.getContinuousQueryName() + ": query result empty");
+ LOGGER.info(plan.getContinuousQueryName() + ": query result empty");
return;
}
doInsert(result, queryPlan);
}
- public void onRejection() {
- logger.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
- }
-
private GroupByTimePlan generateQueryPlan() throws QueryProcessException {
-
QueryOperator queryOperator = plan.getQueryOperator();
// To handle the time series meta changes in different queries, i.e. creation & deletion,
@@ -121,7 +112,8 @@ public class ContinuousQueryTask extends WrappedRunnable {
// we need to save one copy of the original SelectComponent.
SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
- GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator);
+ GroupByTimePlan queryPlan =
+ serviceProvider.getPlanner().cqQueryOperatorToGroupByTimePlan(queryOperator);
queryOperator.setSelectComponent(selectComponentCopy);
@@ -133,19 +125,26 @@ public class ContinuousQueryTask extends WrappedRunnable {
private QueryDataSet doQuery(GroupByTimePlan queryPlan)
throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
- IOException, InterruptedException, QueryProcessException {
- long queryId = QueryResourceManager.getInstance().assignQueryId(true);
-
+ IOException, InterruptedException, QueryProcessException, TException, SQLException {
+ final long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+ final QueryContext queryContext =
+ serviceProvider.genQueryContext(
+ queryId,
+ plan.isDebug(),
+ System.currentTimeMillis(),
+ "CQ plan",
+ IoTDBConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
try {
- return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
+ return serviceProvider.createQueryDataSet(
+ queryContext, queryPlan, IoTDBConstant.DEFAULT_FETCH_SIZE);
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
}
}
private void doInsert(QueryDataSet result, GroupByTimePlan queryPlan)
- throws QueryProcessException, IOException, IllegalPathException {
-
+ throws IOException, IllegalPathException, QueryProcessException, StorageGroupNotSetException,
+ StorageEngineException {
int columnSize = result.getDataTypes().size();
TSDataType dataType =
TypeInferenceUtils.getAggrDataType(
@@ -156,7 +155,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
int batchSize =
(int)
Math.min(
- BATCH_SIZE,
+ EXECUTION_BATCH_SIZE,
Math.ceil(
(float) plan.getForInterval()
/ ((GroupByClauseComponent)
@@ -186,7 +185,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
insertTabletPlans[i].setTimes(timestamps[i]);
insertTabletPlans[i].setColumns(columns[i]);
insertTabletPlans[i].setRowCount(rowNums[i]);
- planExecutor.insertTablet(insertTabletPlans[i]);
+ serviceProvider.executeNonQuery(insertTabletPlans[i]);
}
}
}
@@ -284,7 +283,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
nodes[nodes.length - 1] = nodes[nodes.length - 1].substring(0, indexOfRightBracket);
}
StringBuffer sb = new StringBuffer();
- Matcher m = pattern.matcher(this.plan.getTargetPath().getFullPath());
+ Matcher m = PATH_NODE_NAME_PATTERN.matcher(this.plan.getTargetPath().getFullPath());
while (m.find()) {
String param = m.group();
String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -294,6 +293,10 @@ public class ContinuousQueryTask extends WrappedRunnable {
return sb.toString();
}
+ public void onRejection() {
+ LOGGER.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
+ }
+
public CreateContinuousQueryPlan getCreateContinuousQueryPlan() {
return plan;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
index e38324b..5b112d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
@@ -37,17 +37,18 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ContinuousQueryTaskPoolManager.class);
- private static final int nThreads =
+ private static final int CONTINUOUS_QUERY_THREAD_NUM =
IoTDBDescriptor.getInstance().getConfig().getContinuousQueryThreadNum();
private ContinuousQueryTaskPoolManager() {
-
- LOGGER.info("ContinuousQueryTaskPoolManager is initializing, thread number: {}", nThreads);
+ LOGGER.info(
+ "ContinuousQueryTaskPoolManager is initializing, thread number: {}",
+ CONTINUOUS_QUERY_THREAD_NUM);
pool =
new ThreadPoolExecutor(
- nThreads,
- nThreads,
+ CONTINUOUS_QUERY_THREAD_NUM,
+ CONTINUOUS_QUERY_THREAD_NUM,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(
@@ -81,8 +82,8 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
pool =
new ThreadPoolExecutor(
- nThreads,
- nThreads,
+ CONTINUOUS_QUERY_THREAD_NUM,
+ CONTINUOUS_QUERY_THREAD_NUM,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
index 53108bb..e846e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
@@ -27,4 +27,10 @@ public class ContinuousQueryException extends StorageEngineException {
super(message, TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode());
this.isUserException = true;
}
+
+ public ContinuousQueryException(String message, Exception e) {
+ super(message, e);
+ this.errorCode = TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode();
+ this.isUserException = true;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index 14234d5..3013966 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectIntoOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
public class LogicalChecker {
@@ -37,5 +38,9 @@ public class LogicalChecker {
if (operator instanceof SelectIntoOperator) {
((SelectIntoOperator) operator).check();
}
+
+ if (operator instanceof CreateContinuousQueryOperator) {
+ ((CreateContinuousQueryOperator) operator).getQueryOperator().check();
+ }
}
}