You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/17 10:52:27 UTC
[iotdb] branch master updated: [IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a761a7f [IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)
a761a7f is described below
commit a761a7fe0dd8951cd9cf5b83bbbfcd033ab12639
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Fri Dec 17 18:51:57 2021 +0800
[IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)
---
.../resources/conf/iotdb-engine.properties | 12 +++
.../org/apache/iotdb/db/concurrent/ThreadName.java | 1 +
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++++-
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 6 ++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 82 ++++++++++++++++++-
.../db/qp/physical/crud/InsertMultiTabletPlan.java | 64 +++++++++++++--
.../db/qp/physical/InsertTabletMultiPlanTest.java | 94 ++++++++++++++++++++++
7 files changed, 273 insertions(+), 8 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ac630c7..6d21884 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -862,6 +862,18 @@ timestamp_precision=ms
# Datatype: int
# select_into_insert_tablet_plan_row_limit=10000
+
+####################
+### Insert-Tablets Configuration
+####################
+
+# When the insert plan column count reaches the specified threshold, which means that the plan is relatively large. At this time, may be enabled multithreading.
+# If the tablet is small, the time of each insertion is short.
+# If we enable multithreading, we also need to consider the switching loss between threads,
+# so we need to judge the size of the tablet.
+# Datatype: int
+# insert_multi_tablet_enable_multithreading_column_threshold=10
+
####################
### Index Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index bcf7045..54a04b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -46,6 +46,7 @@ public enum ThreadName {
LOAD_TSFILE("Load-TsFile"),
TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
QUERY_SERVICE("Query"),
+ INSERTION_SERVICE("MultithreadingInsertionPool"),
WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
CLUSTER_INFO_SERVICE("ClusterInfoClient"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 18dc58d..5f58f0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -603,6 +603,14 @@ public class IoTDBConfig {
*/
private int selectIntoInsertTabletPlanRowLimit = 10000;
+ /**
+ * When the insert plan column count reaches the specified threshold, which means that the plan is
+ * relatively large. At this time, may be enabled multithreading. If the tablet is small, the time
+ * of each insertion is short. If we enable multithreading, we also need to consider the switching
+ * loss between threads, so we need to judge the size of the tablet.
+ */
+ private int insertMultiTabletEnableMultithreadingColumnThreshold = 10;
+
private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
/** Default system file storage is in local file system (unsupported) */
@@ -1531,12 +1539,22 @@ public class IoTDBConfig {
this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
}
+ public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
+ this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+ }
+
public int getSelectIntoInsertTabletPlanRowLimit() {
return selectIntoInsertTabletPlanRowLimit;
}
- public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
- this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+ public int getInsertMultiTabletEnableMultithreadingColumnThreshold() {
+ return insertMultiTabletEnableMultithreadingColumnThreshold;
+ }
+
+ public void setInsertMultiTabletEnableMultithreadingColumnThreshold(
+ int insertMultiTabletEnableMultithreadingColumnThreshold) {
+ this.insertMultiTabletEnableMultithreadingColumnThreshold =
+ insertMultiTabletEnableMultithreadingColumnThreshold;
}
public int getMergeWriteThroughputMbPerSec() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5b7e2b2..986f829 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -738,6 +738,12 @@ public class IoTDBDescriptor {
"select_into_insert_tablet_plan_row_limit",
String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
+ conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
+ Integer.parseInt(
+ properties.getProperty(
+ "insert_multi_tablet_enable_multithreading_column_threshold",
+ String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
+
// At the same time, set TSFileConfig
TSFileDescriptor.getInstance()
.getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 935903c..633e8b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.cq.ContinuousQueryService;
@@ -144,6 +146,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.db.utils.UpgradeUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -178,6 +181,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.*;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
@@ -214,6 +218,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
public class PlanExecutor implements IPlanExecutor {
@@ -224,7 +229,9 @@ public class PlanExecutor implements IPlanExecutor {
// for data query
protected IQueryRouter queryRouter;
// for administration
- private IAuthorizer authorizer;
+ private final IAuthorizer authorizer;
+
+ private ThreadPoolExecutor insertionPool;
private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
@@ -1510,6 +1517,15 @@ public class PlanExecutor implements IPlanExecutor {
@Override
public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
throws QueryProcessException {
+ if (insertMultiTabletPlan.isEnableMultiThreading()) {
+ insertTabletParallel(insertMultiTabletPlan);
+ } else {
+ insertTabletSerial(insertMultiTabletPlan);
+ }
+ }
+
+ private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
if (insertMultiTabletPlan.getResults().containsKey(i)
|| insertMultiTabletPlan.isExecuted(i)) {
@@ -1528,8 +1544,72 @@ public class PlanExecutor implements IPlanExecutor {
}
}
+ private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+ throws BatchProcessException {
+ updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+ List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+ List<Future<?>> futureList = new ArrayList<>();
+
+ Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+ List<InsertTabletPlan> runPlanList = new ArrayList<>();
+ Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+ for (int i = 0; i < planList.size(); i++) {
+ if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+ runPlanList.add(planList.get(i));
+ runIndexToRealIndex.put(runPlanList.size() - 1, i);
+ }
+ }
+ for (InsertTabletPlan plan : runPlanList) {
+ Future<?> f =
+ insertionPool.submit(
+ () -> {
+ insertTablet(plan);
+ return null;
+ });
+ futureList.add(f);
+ }
+ for (int i = 0; i < futureList.size(); i++) {
+ try {
+ futureList.get(i).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof QueryProcessException) {
+ QueryProcessException qe = (QueryProcessException) e.getCause();
+ results.put(
+ runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+ } else {
+ results.put(
+ runIndexToRealIndex.get(i),
+ RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+ }
+ }
+ }
+
+ if (!results.isEmpty()) {
+ throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+ }
+ }
+
+ private void updateInsertTabletsPool(int sgSize) {
+ int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+ if (insertionPool == null || insertionPool.isTerminated()) {
+ insertionPool =
+ (ThreadPoolExecutor)
+ IoTDBThreadPoolFactory.newFixedThreadPool(
+ updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+ } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+ insertionPool.setCorePoolSize(updateCoreSize);
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+ insertionPool.setMaximumPoolSize(updateCoreSize);
+ insertionPool.setCorePoolSize(updateCoreSize);
+ }
+ }
+
@Override
public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
if (insertTabletPlan.getRowCount() == 0) {
return;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 07e3b85..5b6251a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -29,11 +31,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
/**
* Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -96,6 +94,10 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
boolean[] isExecuted;
+ Boolean isEnableMultithreading;
+
+ Integer differentStorageGroupsCount;
+
public InsertMultiTabletPlan() {
super(OperatorType.MULTI_BATCH_INSERT);
this.insertTabletPlanList = new ArrayList<>();
@@ -384,4 +386,56 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
results.remove(i);
}
}
+
+ public int getDifferentStorageGroupsCount() {
+ if (differentStorageGroupsCount == null) {
+ Set<String> insertPlanSGSet = new HashSet<>();
+ int defaultStorageGroupLevel = new IoTDBConfig().getDefaultStorageGroupLevel();
+ for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+ String[] nodes = insertTabletPlan.getDeviceId().getNodes();
+ StringBuilder stringBuilder = new StringBuilder();
+ for (int i = 0; i <= defaultStorageGroupLevel && i < nodes.length; i++) {
+ stringBuilder.append(nodes[i]).append(".");
+ }
+ insertPlanSGSet.add(stringBuilder.toString());
+ }
+ differentStorageGroupsCount = insertPlanSGSet.size();
+ }
+ return differentStorageGroupsCount;
+ }
+
+ public boolean isEnableMultiThreading() {
+ // If we enable multithreading, we need to consider the loss of switching between threads,
+ // so we need to judge the core threads of the thread pool and the size of the tablet.
+
+ // Therefore, we set the number of core threads in the thread pool to
+ // min(the number of different sg, availableProcessors()/2),
+ // and need columns >= insertMultiTabletEnableMultithreadingColumnThreshold.
+
+ // It should be noted that if the number of sg is large and exceeds twice of the recommended
+ // number of CPU threads,
+ // it may lead to failure to allocate out of heap memory and NPE.
+ // Therefore, we will also turn off multithreading in this case.
+ if (isEnableMultithreading == null) {
+ int sgSize = getDifferentStorageGroupsCount();
+ // SG should be >= 1 so that it will not be locked and degenerate into serial.
+ // SG should be <= Runtime.getRuntime().availableProcessors()*2 so that to avoid failure to
+ // allocate out of heap memory and NPE
+ if (sgSize <= 1 || sgSize >= Runtime.getRuntime().availableProcessors() * 2) {
+ isEnableMultithreading = false;
+ } else {
+ int count = 0;
+ for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+ if (insertTabletPlan.getColumns().length
+ >= IoTDBDescriptor.getInstance()
+ .getConfig()
+ .getInsertMultiTabletEnableMultithreadingColumnThreshold()) {
+ count++;
+ }
+ }
+ isEnableMultithreading = count * 2 >= insertTabletPlanList.size();
+ }
+ }
+ return isEnableMultithreading;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index dff65b4..062bbc3 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -101,4 +101,98 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
Assert.assertEquals(60, record.getFields().size());
}
}
+
+ @Test
+ public void testInsertMultiTabletPlanParallel()
+ throws QueryProcessException, MetadataException, StorageEngineException, IOException,
+ InterruptedException, QueryFilterOptimizationException {
+ long[] times =
+ new long[] {
+ 110L, 111L, 112L, 113L, 110L, 111L, 112L, 113L, 110L, 111L, 112L, 113L, 110L, 111L, 112L,
+ };
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+ dataTypes.add(TSDataType.INT32.ordinal());
+ dataTypes.add(TSDataType.BOOLEAN.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+ dataTypes.add(TSDataType.TEXT.ordinal());
+
+ Object[] columns = new Object[16];
+ int size = (times).length;
+ columns[0] = new double[size];
+ columns[1] = new float[size];
+ columns[2] = new long[size];
+ columns[3] = new int[size];
+ columns[4] = new boolean[size];
+ columns[5] = new Binary[size];
+ columns[6] = new Binary[size];
+ columns[7] = new Binary[size];
+ columns[8] = new Binary[size];
+ columns[9] = new Binary[size];
+ columns[10] = new Binary[size];
+ columns[11] = new Binary[size];
+ columns[12] = new Binary[size];
+ columns[13] = new Binary[size];
+ columns[14] = new Binary[size];
+ columns[15] = new Binary[size];
+
+ for (int r = 0; r < size; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ ((int[]) columns[3])[r] = 100;
+ ((boolean[]) columns[4])[r] = false;
+ ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[6])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[7])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[8])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[9])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[10])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[11])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[12])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[13])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[14])[r] = new Binary("hh" + r);
+ ((Binary[]) columns[15])[r] = new Binary("hh" + r);
+ }
+
+ List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.multi" + i / 5 + ".d" + i),
+ new String[] {
+ "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13",
+ "s14", "s15", "s16"
+ },
+ dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ insertTabletPlanList.add(tabletPlan);
+ }
+ PlanExecutor executor = new PlanExecutor();
+
+ InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+ Assert.assertTrue(insertMultiTabletPlan.isEnableMultiThreading());
+ executor.insertTablet(insertMultiTabletPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.**");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(160, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(160, record.getFields().size());
+ }
+ }
}