You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/11 03:52:43 UTC
[iotdb] 01/03: refactor
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-2277
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3fa935eeabdad91890c7a83417c1a996379f162d
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Tue Jan 11 10:20:13 2022 +0800
refactor
---
.../apache/iotdb/db/cq/ContinuousQueryService.java | 112 +++++++++------------
.../apache/iotdb/db/cq/ContinuousQueryTask.java | 73 +++++++-------
.../db/cq/ContinuousQueryTaskPoolManager.java | 15 +--
3 files changed, 93 insertions(+), 107 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index fb7a1b2..0a6ac1f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.cq;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ContinuousQueryException;
-import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
@@ -43,39 +42,18 @@ import java.util.concurrent.locks.ReentrantLock;
public class ContinuousQueryService implements IService {
- private static long CHECK_INTERVAL =
- IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
- private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryService.class);
+ private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
+ ContinuousQueryTaskPoolManager.getInstance();
+ private static final long TASK_SUBMIT_CHECK_INTERVAL =
+ IoTDBDescriptor.getInstance().getConfig().getContinuousQueryMinimumEveryInterval() / 2;
+ private ScheduledExecutorService cqTasksSubmitThread;
private final ConcurrentHashMap<String, CreateContinuousQueryPlan> continuousQueryPlans =
new ConcurrentHashMap<>();
-
private final ConcurrentHashMap<String, Long> nextExecutionTimestamps = new ConcurrentHashMap<>();
- private final ReentrantLock registrationLock = new ReentrantLock();
-
- private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
-
- private ScheduledExecutorService checkThread;
-
- protected static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
- ContinuousQueryTaskPoolManager.getInstance();
-
- private ContinuousQueryService() {}
-
- public static ContinuousQueryService getInstance() {
- return INSTANCE;
- }
-
- public void acquireRegistrationLock() {
- registrationLock.lock();
- }
-
- public void releaseRegistrationLock() {
- registrationLock.unlock();
- }
-
@Override
public ServiceType getID() {
return ServiceType.CONTINUOUS_QUERY_SERVICE;
@@ -83,7 +61,6 @@ public class ContinuousQueryService implements IService {
@Override
public void start() {
-
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
long nextExecutionTimestamp =
@@ -94,34 +71,16 @@ public class ContinuousQueryService implements IService {
nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
}
- checkThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Check");
- checkThread.scheduleAtFixedRate(
+ cqTasksSubmitThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("CQ-Task-Submit-Thread");
+ cqTasksSubmitThread.scheduleAtFixedRate(
this::checkAndSubmitTasks,
0,
- CHECK_INTERVAL,
+ TASK_SUBMIT_CHECK_INTERVAL,
DatetimeUtils.timestampPrecisionStringToTimeUnit(
IoTDBDescriptor.getInstance().getConfig().getTimestampPrecision()));
- logger.info("Continuous query service started.");
- }
-
- @Override
- public void stop() {
- if (checkThread != null) {
- checkThread.shutdown();
- try {
- checkThread.awaitTermination(600, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- logger.warn("Check thread still doesn't exit after 60s");
- checkThread.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
- }
-
- @Override
- public void shutdown(long milliseconds) throws ShutdownException {
- stop();
+ LOGGER.info("Continuous query service started.");
}
private void checkAndSubmitTasks() {
@@ -137,20 +96,43 @@ public class ContinuousQueryService implements IService {
}
}
- public boolean register(CreateContinuousQueryPlan plan, boolean writeLog)
- throws ContinuousQueryException {
+ @Override
+ public void stop() {
+ if (cqTasksSubmitThread != null) {
+ cqTasksSubmitThread.shutdown();
+ try {
+ cqTasksSubmitThread.awaitTermination(600, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Check thread still doesn't exit after 60s");
+ cqTasksSubmitThread.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
- acquireRegistrationLock();
+ private final ReentrantLock registrationLock = new ReentrantLock();
+ public void acquireRegistrationLock() {
+ registrationLock.lock();
+ }
+
+ public void releaseRegistrationLock() {
+ registrationLock.unlock();
+ }
+
+ public boolean register(CreateContinuousQueryPlan plan, boolean shouldWriteLog)
+ throws ContinuousQueryException {
+ acquireRegistrationLock();
try {
if (continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
throw new ContinuousQueryException(
String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
}
- if (writeLog) {
+ if (shouldWriteLog) {
IoTDB.metaManager.createContinuousQuery(plan);
}
doRegister(plan);
+ return true;
} catch (ContinuousQueryException e) {
throw e;
} catch (Exception e) {
@@ -158,7 +140,6 @@ public class ContinuousQueryService implements IService {
} finally {
releaseRegistrationLock();
}
- return true;
}
private void doRegister(CreateContinuousQueryPlan plan) {
@@ -173,9 +154,7 @@ public class ContinuousQueryService implements IService {
}
public boolean deregister(DropContinuousQueryPlan plan) throws ContinuousQueryException {
-
acquireRegistrationLock();
-
try {
if (!continuousQueryPlans.containsKey(plan.getContinuousQueryName())) {
throw new ContinuousQueryException(
@@ -183,6 +162,7 @@ public class ContinuousQueryService implements IService {
}
IoTDB.metaManager.dropContinuousQuery(plan);
doDeregister(plan);
+ return true;
} catch (ContinuousQueryException e) {
throw e;
} catch (Exception e) {
@@ -190,8 +170,6 @@ public class ContinuousQueryService implements IService {
} finally {
releaseRegistrationLock();
}
-
- return true;
}
private void doDeregister(DropContinuousQueryPlan plan) {
@@ -200,11 +178,8 @@ public class ContinuousQueryService implements IService {
}
public List<ShowContinuousQueriesResult> getShowContinuousQueriesResultList() {
-
List<ShowContinuousQueriesResult> results = new ArrayList<>(continuousQueryPlans.size());
-
for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-
results.add(
new ShowContinuousQueriesResult(
plan.getQuerySql(),
@@ -213,7 +188,14 @@ public class ContinuousQueryService implements IService {
plan.getEveryInterval(),
plan.getForInterval()));
}
-
return results;
}
+
+ private ContinuousQueryService() {}
+
+ private static final ContinuousQueryService INSTANCE = new ContinuousQueryService();
+
+ public static ContinuousQueryService getInstance() {
+ return INSTANCE;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 154c284..48d2aa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.db.cq;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
-import org.apache.iotdb.db.qp.Planner;
-import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.logical.crud.GroupByClauseComponent;
import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.logical.crud.SelectComponent;
@@ -34,6 +34,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
+import org.apache.iotdb.db.service.basic.BasicServiceProvider;
import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -42,10 +43,12 @@ import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -54,30 +57,26 @@ import java.util.regex.Pattern;
public class ContinuousQueryTask extends WrappedRunnable {
- private static final int FETCH_SIZE = 10000;
- private static final int BATCH_SIZE = 10000;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
- private static final Logger logger = LoggerFactory.getLogger(ContinuousQueryTask.class);
+ private static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
+ private static final int EXECUTION_BATCH_SIZE = 10000;
- // To execute the query plan
- private static PlanExecutor planExecutor;
+ // TODO: support CQ in cluster mode
+ private static BasicServiceProvider serviceProvider;
static {
try {
- planExecutor = new PlanExecutor();
+ serviceProvider = new BasicServiceProvider();
} catch (QueryProcessException e) {
- logger.error(e.getMessage());
+ LOGGER.error(e.getMessage());
}
}
// To save the continuous query info
private final CreateContinuousQueryPlan plan;
- // To transform query operator to query plan
- private static final Planner planner = new Planner();
// Next timestamp to execute a query
- private long windowEndTimestamp;
-
- private static final Pattern pattern = Pattern.compile("\\$\\{\\w+}");
+ private final long windowEndTimestamp;
public ContinuousQueryTask(CreateContinuousQueryPlan plan, long windowEndTimestamp) {
this.plan = plan;
@@ -87,31 +86,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
@Override
public void runMayThrow()
throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
- QueryFilterOptimizationException, MetadataException {
-
+ QueryFilterOptimizationException, MetadataException, TException, SQLException {
GroupByTimePlan queryPlan = generateQueryPlan();
-
if (queryPlan.getDeduplicatedPaths().isEmpty()) {
- logger.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
+ LOGGER.info(plan.getContinuousQueryName() + ": deduplicated paths empty");
return;
}
QueryDataSet result = doQuery(queryPlan);
-
if (result == null || result.getPaths().size() == 0) {
- logger.info(plan.getContinuousQueryName() + ": query result empty");
+ LOGGER.info(plan.getContinuousQueryName() + ": query result empty");
return;
}
doInsert(result, queryPlan);
}
- public void onRejection() {
- logger.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
- }
-
private GroupByTimePlan generateQueryPlan() throws QueryProcessException {
-
QueryOperator queryOperator = plan.getQueryOperator();
// To handle the time series meta changes in different queries, i.e. creation & deletion,
@@ -121,7 +112,8 @@ public class ContinuousQueryTask extends WrappedRunnable {
// we need to save one copy of the original SelectComponent.
SelectComponent selectComponentCopy = new SelectComponent(queryOperator.getSelectComponent());
- GroupByTimePlan queryPlan = planner.cqQueryOperatorToGroupByTimePlan(queryOperator);
+ GroupByTimePlan queryPlan =
+ serviceProvider.getPlanner().cqQueryOperatorToGroupByTimePlan(queryOperator);
queryOperator.setSelectComponent(selectComponentCopy);
@@ -133,19 +125,26 @@ public class ContinuousQueryTask extends WrappedRunnable {
private QueryDataSet doQuery(GroupByTimePlan queryPlan)
throws StorageEngineException, QueryFilterOptimizationException, MetadataException,
- IOException, InterruptedException, QueryProcessException {
- long queryId = QueryResourceManager.getInstance().assignQueryId(true);
-
+ IOException, InterruptedException, QueryProcessException, TException, SQLException {
+ final long queryId = QueryResourceManager.getInstance().assignQueryId(true);
+ final QueryContext queryContext =
+ serviceProvider.genQueryContext(
+ queryId,
+ plan.isDebug(),
+ System.currentTimeMillis(),
+ "CQ plan",
+ IoTDBConstant.DEFAULT_CONNECTION_TIMEOUT_MS);
try {
- return planExecutor.processQuery(queryPlan, new QueryContext(queryId));
+ return serviceProvider.createQueryDataSet(
+ queryContext, queryPlan, IoTDBConstant.DEFAULT_FETCH_SIZE);
} finally {
QueryResourceManager.getInstance().endQuery(queryId);
}
}
private void doInsert(QueryDataSet result, GroupByTimePlan queryPlan)
- throws QueryProcessException, IOException, IllegalPathException {
-
+ throws IOException, IllegalPathException, QueryProcessException, StorageGroupNotSetException,
+ StorageEngineException {
int columnSize = result.getDataTypes().size();
TSDataType dataType =
TypeInferenceUtils.getAggrDataType(
@@ -156,7 +155,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
int batchSize =
(int)
Math.min(
- BATCH_SIZE,
+ EXECUTION_BATCH_SIZE,
Math.ceil(
(float) plan.getForInterval()
/ ((GroupByClauseComponent)
@@ -186,7 +185,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
insertTabletPlans[i].setTimes(timestamps[i]);
insertTabletPlans[i].setColumns(columns[i]);
insertTabletPlans[i].setRowCount(rowNums[i]);
- planExecutor.insertTablet(insertTabletPlans[i]);
+ serviceProvider.executeNonQuery(insertTabletPlans[i]);
}
}
}
@@ -284,7 +283,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
nodes[nodes.length - 1] = nodes[nodes.length - 1].substring(0, indexOfRightBracket);
}
StringBuffer sb = new StringBuffer();
- Matcher m = pattern.matcher(this.plan.getTargetPath().getFullPath());
+ Matcher m = PATH_NODE_NAME_PATTERN.matcher(this.plan.getTargetPath().getFullPath());
while (m.find()) {
String param = m.group();
String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -294,6 +293,10 @@ public class ContinuousQueryTask extends WrappedRunnable {
return sb.toString();
}
+ public void onRejection() {
+ LOGGER.warn("Continuous Query Task {} rejected", plan.getContinuousQueryName());
+ }
+
public CreateContinuousQueryPlan getCreateContinuousQueryPlan() {
return plan;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
index e38324b..5b112d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
@@ -37,17 +37,18 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ContinuousQueryTaskPoolManager.class);
- private static final int nThreads =
+ private static final int CONTINUOUS_QUERY_THREAD_NUM =
IoTDBDescriptor.getInstance().getConfig().getContinuousQueryThreadNum();
private ContinuousQueryTaskPoolManager() {
-
- LOGGER.info("ContinuousQueryTaskPoolManager is initializing, thread number: {}", nThreads);
+ LOGGER.info(
+ "ContinuousQueryTaskPoolManager is initializing, thread number: {}",
+ CONTINUOUS_QUERY_THREAD_NUM);
pool =
new ThreadPoolExecutor(
- nThreads,
- nThreads,
+ CONTINUOUS_QUERY_THREAD_NUM,
+ CONTINUOUS_QUERY_THREAD_NUM,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(
@@ -81,8 +82,8 @@ public class ContinuousQueryTaskPoolManager extends AbstractPoolManager {
pool =
new ThreadPoolExecutor(
- nThreads,
- nThreads,
+ CONTINUOUS_QUERY_THREAD_NUM,
+ CONTINUOUS_QUERY_THREAD_NUM,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(