You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/30 11:06:40 UTC
[iotdb] 01/03: DataSetFragmentExecutionPoolManager
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 85d1a91508fad44397d7a1dbb9fbc4631c477c57
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Nov 28 18:16:08 2021 +0800
DataSetFragmentExecutionPoolManager
---
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../db/query/dataset/udf/UDTFJoinDataSet.java | 7 +-
.../pool/DataSetFragmentExecutionPoolManager.java | 83 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 60179af..3a7f0eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -44,6 +44,7 @@ public enum ThreadName {
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
QUERY_SERVICE("Query"),
+ QUERY_FRAGMENT_SERVICE("Query-Fragment"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
CLUSTER_INFO_SERVICE("ClusterInfoClient"),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index fb39323..b9ac75f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.query.dataset.udf;
-import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -54,17 +53,19 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
public UDTFJoinDataSet(
UDTFDataSet[] fragmentDataSets, int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
- throws QueryProcessException, IOException {
+ throws IOException {
this.fragmentDataSets = fragmentDataSets;
this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
resultColumnOutputIndexToFragmentDataSetOutputIndex;
resultColumnsLength = resultColumnOutputIndexToFragmentDataSetOutputIndex.length;
rowRecordsCache = new RowRecord[resultColumnsLength];
+
initTimeHeap();
}
- private void initTimeHeap() throws IOException, QueryProcessException {
+ private void initTimeHeap() throws IOException {
timeHeap = new TimeSelector(resultColumnsLength << 1, true);
+
for (int i = 0; i < resultColumnsLength; ++i) {
UDTFDataSet fragmentDataSet = fragmentDataSets[i];
if (fragmentDataSet.hasNextWithoutConstraint()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
new file mode 100644
index 0000000..0705380
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(DataSetFragmentExecutionPoolManager.class);
+
+ private DataSetFragmentExecutionPoolManager() {
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+ ThreadName.QUERY_FRAGMENT_SERVICE.getName());
+ }
+
+ public static DataSetFragmentExecutionPoolManager getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ @Override
+ public Logger getLogger() {
+ return LOGGER;
+ }
+
+ @Override
+ public String getName() {
+ return "query data set fragment execution task";
+ }
+
+ @Override
+ public void start() {
+ if (pool == null) {
+ pool =
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+ ThreadName.QUERY_FRAGMENT_SERVICE.getName());
+ }
+ }
+
+ @Override
+ public void stop() {
+ if (pool != null) {
+ close();
+ pool = null;
+ }
+ }
+
+ private static class InstanceHolder {
+
+ private InstanceHolder() {
+ // do nothing
+ }
+
+ private static final DataSetFragmentExecutionPoolManager INSTANCE =
+ new DataSetFragmentExecutionPoolManager();
+ }
+}