You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 03:52:43 UTC

[iotdb] 01/03: refactor

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2277
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3fa935eeabdad91890c7a83417c1a996379f162d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 10:20:13 2022 +0800

    refactor
---
 .../apache/iotdb/db/cq/ContinuousQueryService.java | 112 +++++++++------------
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    |  73 +++++++-------
 .../db/cq/ContinuousQueryTaskPoolManager.java      |  15 +--
 3 files changed, 93 insertions(+), 107 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index fb7a1b2..0a6ac1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.cq;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.ContinuousQueryException;
-import org.apache.iotdb.db.exception.ShutdownException;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
@@ -43,39 +42,18 @@ import java.util.concurrent.locks.ReentrantLock;
 
 public class ContinuousQueryService implements IService {
 
-  private static long CHECK_INTERVAL =
-      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
 
-  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryService.class);
+  private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
+      ContinuousQueryTaskPoolManager.getInstance();
+  private static final long TASK_SUBMIT_CHECK_INTERVAL =
+      IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+  private ScheduledExecutorService cqTasksSubmitThread;
 
   private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
       new ConcurrentHashMap<>();
-
   private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
 
-  private final ReentrantLock registrationLock = new ReentrantLock();
-
-  private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
-
-  private ScheduledExecutorService checkThread;
-
-  protected static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
-      ContinuousQueryTaskPoolManager.getInstance();
-
-  private ContinuousQueryService() {}
-
-  public static ContinuousQueryService getInstance() {
-    return INSTANCE;
-  }
-
-  public void acquireRegistrationLock() {
-    registrationLock.lock();
-  }
-
-  public void releaseRegistrationLock() {
-    registrationLock.unlock();
-  }
-
   @Override
   public ServiceType getID() {
     return ServiceType.CONTINUOUS_QUERY_SERVICE;
@@ -83,7 +61,6 @@ public class ContinuousQueryService implements IService {
 
   @Override
   public void start() {
-
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
       long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
       long nextExecutionTimestamp =
@@ -94,34 +71,16 @@ public class ContinuousQueryService implements IService {
       nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
     }
 
-    checkThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Check");
-    checkThread.scheduleAtFixedRate(
+    cqTasksSubmitThread =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
+    cqTasksSubmitThread.scheduleAtFixedRate(
         this::checkAndSubmitTasks,
         0,
-        CHECK_INTERVAL,
+        TASK_SUBMIT_CHECK_INTERVAL,
         DatetimeUtils.timestampPrecisionStringToTimeUnit(
             IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
 
-    logger.info("Continuous query service started.");
-  }
-
-  @Override
-  public void stop() {
-    if (checkThread != null) {
-      checkThread.shutdown();
-      try {
-        checkThread.awaitTermination(600, TimeUnit.MILLISECONDS);
-      } catch (InterruptedException e) {
-        logger.warn("Check thread still doesn't exit after 60s");
-        checkThread.shutdownNow();
-        Thread.currentThread().interrupt();
-      }
-    }
-  }
-
-  @Override
-  public void shutdown(long milliseconds) throws ShutdownException {
-    stop();
+    LOGGER.info("Continuous query service started.");
   }
 
   private void checkAndSubmitTasks() {
@@ -137,20 +96,43 @@ public class ContinuousQueryService implements IService {
     }
   }
 
-  public boolean register(CreateContinuousQueryPlan plan, boolean writeLog)
-      throws ContinuousQueryException {
+  @Override
+  public void stop() {
+    if (cqTasksSubmitThread != null) {
+      cqTasksSubmitThread.shutdown();
+      try {
+        cqTasksSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Check thread still doesn't exit after 60s");
+        cqTasksSubmitThread.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
 
-    acquireRegistrationLock();
+  private final ReentrantLock registrationLock = new ReentrantLock();
 
+  public void acquireRegistrationLock() {
+    registrationLock.lock();
+  }
+
+  public void releaseRegistrationLock() {
+    registrationLock.unlock();
+  }
+
+  public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
+      throws ContinuousQueryException {
+    acquireRegistrationLock();
     try {
       if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
         throw new ContinuousQueryException(
             String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
       }
-      if (writeLog) {
+      if (shouldWriteLog) {
         IoTDB.metaManager.createContinuousQuery(plan);
       }
       doRegister(plan);
+      return true;
     } catch (ContinuousQueryException e) {
       throw e;
     } catch (Exception e) {
@@ -158,7 +140,6 @@ public class ContinuousQueryService implements IService {
     } finally {
       releaseRegistrationLock();
     }
-    return true;
   }
 
   private void doRegister(CreateContinuousQueryPlan plan) {
@@ -173,9 +154,7 @@ public class ContinuousQueryService implements IService {
   }
 
   public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
-
     acquireRegistrationLock();
-
     try {
       if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
         throw new ContinuousQueryException(
@@ -183,6 +162,7 @@ public class ContinuousQueryService implements IService {
       }
       IoTDB.metaManager.dropContinuousQuery(plan);
       doDeregister(plan);
+      return true;
     } catch (ContinuousQueryException e) {
       throw e;
     } catch (Exception e) {
@@ -190,8 +170,6 @@ public class ContinuousQueryService implements IService {
     } finally {
       releaseRegistrationLock();
     }
-
-    return true;
   }
 
   private void doDeregister(DropContinuousQueryPlan plan) {
@@ -200,11 +178,8 @@ public class ContinuousQueryService implements IService {
   }
 
   public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
-
     List<ShowContinuousQueriesResult> results = new ArrayList<>(continuousQueryPlans.size());
-
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-
       results.add(
           new ShowContinuousQueriesResult(
               plan.getQuerySql(),
@@ -213,7 +188,14 @@ public class ContinuousQueryService implements IService {
               plan.getEveryInterval(),
               plan.getForInterval()));
     }
-
     return results;
   }
+
+  private ContinuousQueryService() {}
+
+  private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
+
+  public static ContinuousQueryService getInstance() {
+    return INSTANCE;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284..48d2aa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -19,13 +19,13 @@
 package org.apache.iotdb.db.cq;
 
 import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
 import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
 import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
 import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,10 +43,12 @@ import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
 import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -54,30 +57,26 @@ import java.util.regex.Pattern;
 
 public class ContinuousQueryTask extends WrappedRunnable {
 
-  private static final int FETCH_SIZE = 10000;
-  private static final int BATCH_SIZE = 10000;
+  private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
 
-  private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryTask.class);
+  private static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
+  private static final int EXECUTION_BATCH_SIZE = 10000;
 
-  // To execute the query plan
-  private static PlanExecutor planExecutor;
+  // TODO: support CQ in cluster mode
+  private static BasicServiceProvider serviceProvider;
 
   static {
     try {
-      planExecutor = new PlanExecutor();
+      serviceProvider = new BasicServiceProvider();
     } catch (QueryProcessException e) {
-      logger.error(e.getMessage());
+      LOGGER.error(e.getMessage());
     }
   }
 
   // To save the continuous query info
   private final CreateContinuousQueryPlan plan;
-  // To transform query operator to query plan
-  private static final Planner planner = new Planner();
   // Next timestamp to execute a query
-  private long windowEndTimestamp;
-
-  private static final Pattern pattern = Pattern.compile("\\$\\{\\w+}");
+  private final long windowEndTimestamp;
 
   public ContinuousQueryTask(CreateContinuousQueryPlan plan, long windowEndTimestamp) {
     this.plan = plan;
@@ -87,31 +86,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
   @Override
   public void runMayThrow()
       throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
-          QueryFilterOptimizationException, MetadataException {
-
+          QueryFilterOptimizationException, MetadataException, TException, SQLException {
     GroupByTimePlan queryPlan = generateQueryPlan();
-
     if (queryPlan.getDeduplicatedPaths().isEmpty()) {
-      logger.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
+      LOGGER.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
       return;
     }
 
     QueryDataSet result = doQuery(queryPlan);
-
     if (result == null || result.getPaths().size() == 0) {
-      logger.info(plan.getContinuousQueryName() + ": query result empty");
+      LOGGER.info(plan.getContinuousQueryName() + ": query result empty");
       return;
     }
 
     doInsert(result, queryPlan);
   }
 
-  public void onRejection() {
-    logger.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
-  }
-
   private GroupByTimePlan generateQueryPlan() throws QueryProcessException {
-
     QueryOperator queryOperator = plan.getQueryOperator();
 
     // To handle the time series meta changes in different queries, i.e. creation & deletion,
@@ -121,7 +112,8 @@ public class ContinuousQueryTask extends WrappedRunnable {
     // we need to save one copy of the original SelectComponent.
     SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
 
-    GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator);
+    GroupByTimePlan queryPlan =
+        serviceProvider.getPlanner().cqQueryOperatorToGroupByTimePlan(queryOperator);
 
     queryOperator.setSelectComponent(selectComponentCopy);
 
@@ -133,19 +125,26 @@ public class ContinuousQueryTask extends WrappedRunnable {
 
   private QueryDataSet doQuery(GroupByTimePlan queryPlan)
       throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
-          IOException, InterruptedException, QueryProcessException {
-    long queryId = QueryResourceManager.getInstance().assignQueryId(true);
-
+          IOException, InterruptedException, QueryProcessException, TException, SQLException {
+    final long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+    final QueryContext queryContext =
+        serviceProvider.genQueryContext(
+            queryId,
+            plan.isDebug(),
+            System.currentTimeMillis(),
+            "CQ plan",
+            IoTDBConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
     try {
-      return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
+      return serviceProvider.createQueryDataSet(
+          queryContext, queryPlan, IoTDBConstant.DEFAULT_FETCH_SIZE);
     } finally {
       QueryResourceManager.getInstance().endQuery(queryId);
     }
   }
 
   private void doInsert(QueryDataSet result, GroupByTimePlan queryPlan)
-      throws QueryProcessException, IOException, IllegalPathException {
-
+      throws IOException, IllegalPathException, QueryProcessException, StorageGroupNotSetException,
+          StorageEngineException {
     int columnSize = result.getDataTypes().size();
     TSDataType dataType =
         TypeInferenceUtils.getAggrDataType(
@@ -156,7 +155,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     int batchSize =
         (int)
             Math.min(
-                BATCH_SIZE,
+                EXECUTION_BATCH_SIZE,
                 Math.ceil(
                     (float) plan.getForInterval()
                         / ((GroupByClauseComponent)
@@ -186,7 +185,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
           insertTabletPlans[i].setTimes(timestamps[i]);
           insertTabletPlans[i].setColumns(columns[i]);
           insertTabletPlans[i].setRowCount(rowNums[i]);
-          planExecutor.insertTablet(insertTabletPlans[i]);
+          serviceProvider.executeNonQuery(insertTabletPlans[i]);
         }
       }
     }
@@ -284,7 +283,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
       nodes[nodes.length - 1] = nodes[nodes.length - 1].substring(0, indexOfRightBracket);
     }
     StringBuffer sb = new StringBuffer();
-    Matcher m = pattern.matcher(this.plan.getTargetPath().getFullPath());
+    Matcher m = PATH_NODE_NAME_PATTERN.matcher(this.plan.getTargetPath().getFullPath());
     while (m.find()) {
       String param = m.group();
       String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -294,6 +293,10 @@ public class ContinuousQueryTask extends WrappedRunnable {
     return sb.toString();
   }
 
+  public void onRejection() {
+    LOGGER.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
+  }
+
   public CreateContinuousQueryPlan getCreateContinuousQueryPlan() {
     return plan;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
index e38324b..5b112d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
@@ -37,17 +37,18 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
   private static final Logger LOGGER =
       LoggerFactory.getLogger(ContinuousQueryTaskPoolManager.class);
 
-  private static final int nThreads =
+  private static final int CONTINUOUS_QUERY_THREAD_NUM =
       IoTDBDescriptor.getInstance().getConfig().getContinuousQueryThreadNum();
 
   private ContinuousQueryTaskPoolManager() {
-
-    LOGGER.info("ContinuousQueryTaskPoolManager is initializing, thread number: {}", nThreads);
+    LOGGER.info(
+        "ContinuousQueryTaskPoolManager is initializing, thread number: {}",
+        CONTINUOUS_QUERY_THREAD_NUM);
 
     pool =
         new ThreadPoolExecutor(
-            nThreads,
-            nThreads,
+            CONTINUOUS_QUERY_THREAD_NUM,
+            CONTINUOUS_QUERY_THREAD_NUM,
             0L,
             TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<>(
@@ -81,8 +82,8 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
 
     pool =
         new ThreadPoolExecutor(
-            nThreads,
-            nThreads,
+            CONTINUOUS_QUERY_THREAD_NUM,
+            CONTINUOUS_QUERY_THREAD_NUM,
             0L,
             TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<>(