You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 05:22:15 UTC

[iotdb] branch master updated: [IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9baeae3  [IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)
9baeae3 is described below

commit 9baeae3f5cd7b5037607e7e49cbc0a763ff17cb3
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 13:21:38 2022 +0800

    [IOTDB-2277] CQ: No warn message when aggregate function and timeseries types do not match (#4766)
---
 .../db/integration/IoTDBContinuousQueryIT.java     |   8 +-
 .../apache/iotdb/db/cq/ContinuousQueryService.java | 147 ++++++++++-----------
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    |  73 +++++-----
 .../db/cq/ContinuousQueryTaskPoolManager.java      |  15 ++-
 .../db/exception/ContinuousQueryException.java     |   6 +
 .../iotdb/db/qp/strategy/LogicalChecker.java       |   5 +
 6 files changed, 132 insertions(+), 122 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index cbfff06..dbf91d7 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -127,12 +127,12 @@ public class IoTDBContinuousQueryIT {
             + "GROUP BY time(1s) END");
     statement.execute(
         "CREATE CONTINUOUS QUERY cq2 "
-            + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+            + "BEGIN SELECT avg(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
             + " GROUP BY time(1s), level=3 END");
     statement.execute(
         "CREATE CONTINUOUS QUERY cq3 "
             + "RESAMPLE EVERY 2s FOR 2s "
-            + "BEGIN SELECT avg(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
+            + "BEGIN SELECT min_value(temperature) INTO temperature_avg FROM root.ln.wf01.*.* "
             + "GROUP BY time(1s), level=2 END");
 
     statement.execute("DROP CONTINUOUS QUERY cq1");
@@ -166,11 +166,11 @@ public class IoTDBContinuousQueryIT {
 
     statement.execute(
         "CREATE CONTINUOUS QUERY cq1 "
-            + "BEGIN SELECT max_value(temperature) INTO temperature_max FROM root.ln.*.*.* "
+            + "BEGIN SELECT sum(temperature) INTO temperature_max FROM root.ln.*.*.* "
             + "GROUP BY time(1s) END");
     statement.execute(
         "CREATE CONTINUOUS QUERY cq2 "
-            + "BEGIN SELECT count(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
+            + "BEGIN SELECT avg(temperature) INTO temperature_cnt FROM root.ln.wf01.*.* "
             + " GROUP BY time(1s), level=3 END");
 
     checkShowContinuousQueriesResult(new String[] {"cq3", "cq1", "cq2"});
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index fb7a1b2..8f3e758 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.cq;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
-import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
@@ -43,39 +42,18 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public class ContinuousQueryService implements IService {
 
-  private static long CHECK_INTERVAL =
-      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
 
-  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryService.class);
+  private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
+      ContinuousQueryTaskPoolManager.getInstance();
+  private static final long TASK_SUBMIT_CHECK_INTERVAL =
+      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+  private ScheduledExecutorService continuousQueryTaskSubmitThread;
 
   private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
       new ConcurrentHashMap<>();
-
   private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
 
-  private final ReentrantLock registrationLock = new ReentrantLock();
-
-  private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
-
-  private ScheduledExecutorService checkThread;
-
-  protected static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
-      ContinuousQueryTaskPoolManager.getInstance();
-
-  private ContinuousQueryService() {}
-
-  public static ContinuousQueryService getInstance() {
-    return INSTANCE;
-  }
-
-  public void acquireRegistrationLock() {
-    registrationLock.lock();
-  }
-
-  public void releaseRegistrationLock() {
-    registrationLock.unlock();
-  }
-
   @Override
   public ServiceType getID() {
     return ServiceType.CONTINUOUS_QUERY_SERVICE;
@@ -83,7 +61,6 @@ public class ContinuousQueryService implements IService {
 
   @Override
   public void start() {
-
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
       long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
       long nextExecutionTimestamp =
@@ -94,76 +71,94 @@ public class ContinuousQueryService implements IService {
       nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
     }
 
-    checkThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Check");
-    checkThread.scheduleAtFixedRate(
+    continuousQueryTaskSubmitThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
+    continuousQueryTaskSubmitThread.scheduleAtFixedRate(
         this::checkAndSubmitTasks,
         0,
-        CHECK_INTERVAL,
+        TASK_SUBMIT_CHECK_INTERVAL,
         DatetimeUtils.timestampPrecisionStringToTimeUnit(
             IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
 
-    logger.info("Continuous query service started.");
+    LOGGER.info("Continuous query service started.");
+  }
+
+  private void checkAndSubmitTasks() {
+    long currentTimestamp = DatetimeUtils.currentTime();
+    for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
+      long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
+      while (currentTimestamp >= nextExecutionTimestamp) {
+        TASK_POOL_MANAGER.submit(new ContinuousQueryTask(plan, nextExecutionTimestamp));
+        nextExecutionTimestamp += plan.getEveryInterval();
+      }
+      nextExecutionTimestamps.replace(plan.getContinuousQueryName(), nextExecutionTimestamp);
+    }
   }
 
   @Override
   public void stop() {
-    if (checkThread != null) {
-      checkThread.shutdown();
+    if (continuousQueryTaskSubmitThread != null) {
+      continuousQueryTaskSubmitThread.shutdown();
       try {
-        checkThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+        continuousQueryTaskSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
-        logger.warn("Check thread still doesn't exit after 60s");
-        checkThread.shutdownNow();
+        LOGGER.warn("Check thread still doesn't exit after 60s");
+        continuousQueryTaskSubmitThread.shutdownNow();
         Thread.currentThread().interrupt();
       }
     }
   }
 
-  @Override
-  public void shutdown(long milliseconds) throws ShutdownException {
-    stop();
-  }
+  private final ReentrantLock registrationLock = new ReentrantLock();
 
-  private void checkAndSubmitTasks() {
-    long currentTimestamp = DatetimeUtils.currentTime();
+  public void acquireRegistrationLock() {
+    registrationLock.lock();
+  }
 
-    for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-      long nextExecutionTimestamp = nextExecutionTimestamps.get(plan.getContinuousQueryName());
-      while (currentTimestamp >= nextExecutionTimestamp) {
-        TASK_POOL_MANAGER.submit(new ContinuousQueryTask(plan, nextExecutionTimestamp));
-        nextExecutionTimestamp += plan.getEveryInterval();
-      }
-      nextExecutionTimestamps.replace(plan.getContinuousQueryName(), nextExecutionTimestamp);
-    }
+  public void releaseRegistrationLock() {
+    registrationLock.unlock();
   }
 
-  public boolean register(CreateContinuousQueryPlan plan, boolean writeLog)
+  public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
       throws ContinuousQueryException {
+    if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+      throw new ContinuousQueryException(
+          String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
+    }
 
-    acquireRegistrationLock();
+    // some exceptions will only occur at runtime
+    tryExecuteCQTaskOnceBeforeRegistration(plan);
 
+    acquireRegistrationLock();
     try {
-      if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
-        throw new ContinuousQueryException(
-            String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
-      }
-      if (writeLog) {
+      if (shouldWriteLog) {
         IoTDB.metaManager.createContinuousQuery(plan);
       }
       doRegister(plan);
-    } catch (ContinuousQueryException e) {
-      throw e;
+      return true;
     } catch (Exception e) {
       throw new ContinuousQueryException(e.getMessage());
     } finally {
       releaseRegistrationLock();
     }
-    return true;
+  }
+
+  private void tryExecuteCQTaskOnceBeforeRegistration(CreateContinuousQueryPlan plan)
+      throws ContinuousQueryException {
+    try {
+      new ContinuousQueryTask(plan, plan.getCreationTimestamp()).run();
+    } catch (Exception e) {
+      throw new ContinuousQueryException("Failed to create continuous query task.", e);
+    }
   }
 
   private void doRegister(CreateContinuousQueryPlan plan) {
     continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
-    nextExecutionTimestamps.put(plan.getContinuousQueryName(), plan.getCreationTimestamp());
+    // one cq task has been executed in tryExecuteCQTaskOnceBeforeRegistration
+    // so nextExecutionTimestamp should start with
+    //     plan.getCreationTimestamp() + plan.getEveryInterval()
+    nextExecutionTimestamps.put(
+        plan.getContinuousQueryName(), plan.getCreationTimestamp() + plan.getEveryInterval());
   }
 
   public void deregisterAll() throws ContinuousQueryException {
@@ -173,25 +168,21 @@ public class ContinuousQueryService implements IService {
   }
 
   public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
+    if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
+      throw new ContinuousQueryException(
+          String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
+    }
 
     acquireRegistrationLock();
-
     try {
-      if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
-        throw new ContinuousQueryException(
-            String.format("Continuous Query [%s] does not exist", plan.getContinuousQueryName()));
-      }
       IoTDB.metaManager.dropContinuousQuery(plan);
       doDeregister(plan);
-    } catch (ContinuousQueryException e) {
-      throw e;
+      return true;
     } catch (Exception e) {
       throw new ContinuousQueryException(e.getMessage());
     } finally {
       releaseRegistrationLock();
     }
-
-    return true;
   }
 
   private void doDeregister(DropContinuousQueryPlan plan) {
@@ -200,11 +191,8 @@ public class ContinuousQueryService implements IService {
   }
 
   public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
-
     List<ShowContinuousQueriesResult> results = new ArrayList<>(continuousQueryPlans.size());
-
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-
       results.add(
           new ShowContinuousQueriesResult(
               plan.getQuerySql(),
@@ -213,7 +201,14 @@ public class ContinuousQueryService implements IService {
               plan.getEveryInterval(),
               plan.getForInterval()));
     }
-
     return results;
   }
+
+  private ContinuousQueryService() {}
+
+  private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
+
+  public static ContinuousQueryService getInstance() {
+    return INSTANCE;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284..48d2aa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -19,13 +19,13 @@
 package org.apache.iotdb.db.cq;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,10 +43,12 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -54,30 +57,26 @@ import java.util.regex.Pattern;
 
 public class ContinuousQueryTask extends WrappedRunnable {
 
-  private static final int FETCH_SIZE = 10000;
-  private static final int BATCH_SIZE = 10000;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
 
-  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryTask.class);
+  private static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
+  private static final int EXECUTION_BATCH_SIZE = 10000;
 
-  // To execute the query plan
-  private static PlanExecutor planExecutor;
+  // TODO: support CQ in cluster mode
+  private static BasicServiceProvider serviceProvider;
 
   static {
     try {
-      planExecutor = new PlanExecutor();
+      serviceProvider = new BasicServiceProvider();
     } catch (QueryProcessException e) {
-      logger.error(e.getMessage());
+      LOGGER.error(e.getMessage());
     }
   }
 
   // To save the continuous query info
   private final CreateContinuousQueryPlan plan;
-  // To transform query operator to query plan
-  private static final Planner planner = new Planner();
   // Next timestamp to execute a query
-  private long windowEndTimestamp;
-
-  private static final Pattern pattern = Pattern.compile("\\$\\{\\w+}");
+  private final long windowEndTimestamp;
 
   public ContinuousQueryTask(CreateContinuousQueryPlan plan, long windowEndTimestamp) {
     this.plan = plan;
@@ -87,31 +86,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
   @Override
   public void runMayThrow()
       throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
-          QueryFilterOptimizationException, MetadataException {
-
+          QueryFilterOptimizationException, MetadataException, TException, SQLException {
     GroupByTimePlan queryPlan = generateQueryPlan();
-
     if (queryPlan.getDeduplicatedPaths().isEmpty()) {
-      logger.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
+      LOGGER.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
       return;
     }
 
     QueryDataSet result = doQuery(queryPlan);
-
     if (result == null || result.getPaths().size() == 0) {
-      logger.info(plan.getContinuousQueryName() + ": query result empty");
+      LOGGER.info(plan.getContinuousQueryName() + ": query result empty");
       return;
     }
 
     doInsert(result, queryPlan);
   }
 
-  public void onRejection() {
-    logger.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
-  }
-
   private GroupByTimePlan generateQueryPlan() throws QueryProcessException {
-
     QueryOperator queryOperator = plan.getQueryOperator();
 
     // To handle the time series meta changes in different queries, i.e. creation & deletion,
@@ -121,7 +112,8 @@ public class ContinuousQueryTask extends WrappedRunnable {
     // we need to save one copy of the original SelectComponent.
     SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
 
-    GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator);
+    GroupByTimePlan queryPlan =
+        serviceProvider.getPlanner().cqQueryOperatorToGroupByTimePlan(queryOperator);
 
     queryOperator.setSelectComponent(selectComponentCopy);
 
@@ -133,19 +125,26 @@ public class ContinuousQueryTask extends WrappedRunnable {
 
   private QueryDataSet doQuery(GroupByTimePlan queryPlan)
       throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
-          IOException, InterruptedException, QueryProcessException {
-    long queryId = QueryResourceManager.getInstance().assignQueryId(true);
-
+          IOException, InterruptedException, QueryProcessException, TException, SQLException {
+    final long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+    final QueryContext queryContext =
+        serviceProvider.genQueryContext(
+            queryId,
+            plan.isDebug(),
+            System.currentTimeMillis(),
+            "CQ plan",
+            IoTDBConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
     try {
-      return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
+      return serviceProvider.createQueryDataSet(
+          queryContext, queryPlan, IoTDBConstant.DEFAULT_FETCH_SIZE);
     } finally {
       QueryResourceManager.getInstance().endQuery(queryId);
     }
   }
 
   private void doInsert(QueryDataSet result, GroupByTimePlan queryPlan)
-      throws QueryProcessException, IOException, IllegalPathException {
-
+      throws IOException, IllegalPathException, QueryProcessException, StorageGroupNotSetException,
+          StorageEngineException {
     int columnSize = result.getDataTypes().size();
     TSDataType dataType =
         TypeInferenceUtils.getAggrDataType(
@@ -156,7 +155,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     int batchSize =
         (int)
             Math.min(
-                BATCH_SIZE,
+                EXECUTION_BATCH_SIZE,
                 Math.ceil(
                     (float) plan.getForInterval()
                         / ((GroupByClauseComponent)
@@ -186,7 +185,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
           insertTabletPlans[i].setTimes(timestamps[i]);
           insertTabletPlans[i].setColumns(columns[i]);
           insertTabletPlans[i].setRowCount(rowNums[i]);
-          planExecutor.insertTablet(insertTabletPlans[i]);
+          serviceProvider.executeNonQuery(insertTabletPlans[i]);
         }
       }
     }
@@ -284,7 +283,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
       nodes[nodes.length - 1] = nodes[nodes.length - 1].substring(0, indexOfRightBracket);
     }
     StringBuffer sb = new StringBuffer();
-    Matcher m = pattern.matcher(this.plan.getTargetPath().getFullPath());
+    Matcher m = PATH_NODE_NAME_PATTERN.matcher(this.plan.getTargetPath().getFullPath());
     while (m.find()) {
       String param = m.group();
       String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -294,6 +293,10 @@ public class ContinuousQueryTask extends WrappedRunnable {
     return sb.toString();
   }
 
+  public void onRejection() {
+    LOGGER.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
+  }
+
   public CreateContinuousQueryPlan getCreateContinuousQueryPlan() {
     return plan;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
index e38324b..5b112d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
@@ -37,17 +37,18 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(ContinuousQueryTaskPoolManager.class);
 
-  private static final int nThreads =
+  private static final int CONTINUOUS_QUERY_THREAD_NUM =
       IoTDBDescriptor.getInstance().getConfig().getContinuousQueryThreadNum();
 
   private ContinuousQueryTaskPoolManager() {
-
-    LOGGER.info("ContinuousQueryTaskPoolManager is initializing, thread number: {}", nThreads);
+    LOGGER.info(
+        "ContinuousQueryTaskPoolManager is initializing, thread number: {}",
+        CONTINUOUS_QUERY_THREAD_NUM);
 
     pool =
         new ThreadPoolExecutor(
-            nThreads,
-            nThreads,
+            CONTINUOUS_QUERY_THREAD_NUM,
+            CONTINUOUS_QUERY_THREAD_NUM,
             0L,
             TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<>(
@@ -81,8 +82,8 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
 
     pool =
         new ThreadPoolExecutor(
-            nThreads,
-            nThreads,
+            CONTINUOUS_QUERY_THREAD_NUM,
+            CONTINUOUS_QUERY_THREAD_NUM,
             0L,
             TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<>(
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
index 53108bb..e846e98 100644
--- a/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
+++ b/server/src/main/java/org/apache/iotdb/db/exception/ContinuousQueryException.java
@@ -27,4 +27,10 @@ public class ContinuousQueryException extends StorageEngineException {
     super(message, TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode());
     this.isUserException = true;
   }
+
+  public ContinuousQueryException(String message, Exception e) {
+    super(message, e);
+    this.errorCode = TSStatusCode.CONTINUOUS_QUERY_ERROR.getStatusCode();
+    this.isUserException = true;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
index 14234d5..3013966 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/strategy/LogicalChecker.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.exception.query.LogicalOperatorException;
 import org.apache.iotdb.db.qp.logical.Operator;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectIntoOperator;
+import org.apache.iotdb.db.qp.logical.sys.CreateContinuousQueryOperator;
 
 public class LogicalChecker {
 
@@ -37,5 +38,9 @@ public class LogicalChecker {
     if (operator instanceof SelectIntoOperator) {
       ((SelectIntoOperator) operator).check();
     }
+
+    if (operator instanceof CreateContinuousQueryOperator) {
+      ((CreateContinuousQueryOperator) operator).getQueryOperator().check();
+    }
   }
 }