You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/11/30 11:06:40 UTC

[iotdb] 01/03: DataSetFragmentExecutionPoolManager

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-1971
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 85d1a91508fad44397d7a1dbb9fbc4631c477c57
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sun Nov 28 18:16:08 2021 +0800

    DataSetFragmentExecutionPoolManager
---
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  1 +
 .../db/query/dataset/udf/UDTFJoinDataSet.java      |  7 +-
 .../pool/DataSetFragmentExecutionPoolManager.java  | 83 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index 60179af..3a7f0eb 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -44,6 +44,7 @@ public enum ThreadName {
   LOAD_TSFILE("Load-TsFile"),
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
   QUERY_SERVICE("Query"),
+  QUERY_FRAGMENT_SERVICE("Query-Fragment"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
   CLUSTER_INFO_SERVICE("ClusterInfoClient"),
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
index fb39323..b9ac75f 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/udf/UDTFJoinDataSet.java
@@ -19,7 +19,6 @@
 
 package org.apache.iotdb.db.query.dataset.udf;
 
-import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.query.dataset.DirectAlignByTimeDataSet;
 import org.apache.iotdb.db.tools.watermark.WatermarkEncoder;
 import org.apache.iotdb.db.utils.datastructure.TimeSelector;
@@ -54,17 +53,19 @@ public class UDTFJoinDataSet extends QueryDataSet implements DirectAlignByTimeDa
 
   public UDTFJoinDataSet(
       UDTFDataSet[] fragmentDataSets, int[][] resultColumnOutputIndexToFragmentDataSetOutputIndex)
-      throws QueryProcessException, IOException {
+      throws IOException {
     this.fragmentDataSets = fragmentDataSets;
     this.resultColumnOutputIndexToFragmentDataSetOutputIndex =
         resultColumnOutputIndexToFragmentDataSetOutputIndex;
     resultColumnsLength = resultColumnOutputIndexToFragmentDataSetOutputIndex.length;
     rowRecordsCache = new RowRecord[resultColumnsLength];
+
     initTimeHeap();
   }
 
-  private void initTimeHeap() throws IOException, QueryProcessException {
+  private void initTimeHeap() throws IOException {
     timeHeap = new TimeSelector(resultColumnsLength << 1, true);
+
     for (int i = 0; i < resultColumnsLength; ++i) {
       UDTFDataSet fragmentDataSet = fragmentDataSets[i];
       if (fragmentDataSet.hasNextWithoutConstraint()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
new file mode 100644
index 0000000..0705380
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/query/pool/DataSetFragmentExecutionPoolManager.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.pool;
+
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.rescon.AbstractPoolManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataSetFragmentExecutionPoolManager extends AbstractPoolManager {
+
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(DataSetFragmentExecutionPoolManager.class);
+
+  private DataSetFragmentExecutionPoolManager() {
+    pool =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+            ThreadName.QUERY_FRAGMENT_SERVICE.getName());
+  }
+
+  public static DataSetFragmentExecutionPoolManager getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  @Override
+  public Logger getLogger() {
+    return LOGGER;
+  }
+
+  @Override
+  public String getName() {
+    return "query data set fragment execution task";
+  }
+
+  @Override
+  public void start() {
+    if (pool == null) {
+      pool =
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              IoTDBDescriptor.getInstance().getConfig().getConcurrentQueryThread(),
+              ThreadName.QUERY_FRAGMENT_SERVICE.getName());
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (pool != null) {
+      close();
+      pool = null;
+    }
+  }
+
+  private static class InstanceHolder {
+
+    private InstanceHolder() {
+      // do nothing
+    }
+
+    private static final DataSetFragmentExecutionPoolManager INSTANCE =
+        new DataSetFragmentExecutionPoolManager();
+  }
+}