You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2021/12/17 10:52:27 UTC

[iotdb] branch master updated: [IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a761a7f  [IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)
a761a7f is described below

commit a761a7fe0dd8951cd9cf5b83bbbfcd033ab12639
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Fri Dec 17 18:51:57 2021 +0800

    [IOTDB-2075] Accelerate the process of insertTablets by using thread pool (#4502)
---
 .../resources/conf/iotdb-engine.properties         | 12 +++
 .../org/apache/iotdb/db/concurrent/ThreadName.java |  1 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 22 ++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  6 ++
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  | 82 ++++++++++++++++++-
 .../db/qp/physical/crud/InsertMultiTabletPlan.java | 64 +++++++++++++--
 .../db/qp/physical/InsertTabletMultiPlanTest.java  | 94 ++++++++++++++++++++++
 7 files changed, 273 insertions(+), 8 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index ac630c7..6d21884 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -862,6 +862,18 @@ timestamp_precision=ms
 # Datatype: int
 # select_into_insert_tablet_plan_row_limit=10000
 
+
+####################
+### Insert-Tablets Configuration
+####################
+
+# When the insert plan column count reaches the specified threshold, which means that the plan is relatively large. At this time, may be enabled multithreading.
+# If the tablet is small, the time of each insertion is short.
+# If we enable multithreading, we also need to consider the switching loss between threads,
+# so we need to judge the size of the tablet.
+# Datatype: int
+# insert_multi_tablet_enable_multithreading_column_threshold=10
+
 ####################
 ### Index Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
index bcf7045..54a04b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/concurrent/ThreadName.java
@@ -46,6 +46,7 @@ public enum ThreadName {
   LOAD_TSFILE("Load-TsFile"),
   TIME_COST_STATISTIC("TIME_COST_STATISTIC"),
   QUERY_SERVICE("Query"),
+  INSERTION_SERVICE("MultithreadingInsertionPool"),
   WINDOW_EVALUATION_SERVICE("WindowEvaluationTaskPoolManager"),
   CONTINUOUS_QUERY_SERVICE("ContinuousQueryTaskPoolManager"),
   CLUSTER_INFO_SERVICE("ClusterInfoClient"),
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 18dc58d..5f58f0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -603,6 +603,14 @@ public class IoTDBConfig {
    */
   private int selectIntoInsertTabletPlanRowLimit = 10000;
 
+  /**
+   * When the insert plan column count reaches the specified threshold, which means that the plan is
+   * relatively large. At this time, may be enabled multithreading. If the tablet is small, the time
+   * of each insertion is short. If we enable multithreading, we also need to consider the switching
+   * loss between threads, so we need to judge the size of the tablet.
+   */
+  private int insertMultiTabletEnableMultithreadingColumnThreshold = 10;
+
   private MergeFileStrategy mergeFileStrategy = MergeFileStrategy.MAX_SERIES_NUM;
 
   /** Default system file storage is in local file system (unsupported) */
@@ -1531,12 +1539,22 @@ public class IoTDBConfig {
     this.continuousQueryMinimumEveryInterval = minimumEveryInterval;
   }
 
+  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
+    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  }
+
   public int getSelectIntoInsertTabletPlanRowLimit() {
     return selectIntoInsertTabletPlanRowLimit;
   }
 
-  public void setSelectIntoInsertTabletPlanRowLimit(int selectIntoInsertTabletPlanRowLimit) {
-    this.selectIntoInsertTabletPlanRowLimit = selectIntoInsertTabletPlanRowLimit;
+  public int getInsertMultiTabletEnableMultithreadingColumnThreshold() {
+    return insertMultiTabletEnableMultithreadingColumnThreshold;
+  }
+
+  public void setInsertMultiTabletEnableMultithreadingColumnThreshold(
+      int insertMultiTabletEnableMultithreadingColumnThreshold) {
+    this.insertMultiTabletEnableMultithreadingColumnThreshold =
+        insertMultiTabletEnableMultithreadingColumnThreshold;
   }
 
   public int getMergeWriteThroughputMbPerSec() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 5b7e2b2..986f829 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -738,6 +738,12 @@ public class IoTDBDescriptor {
                   "select_into_insert_tablet_plan_row_limit",
                   String.valueOf(conf.getSelectIntoInsertTabletPlanRowLimit()))));
 
+      conf.setInsertMultiTabletEnableMultithreadingColumnThreshold(
+          Integer.parseInt(
+              properties.getProperty(
+                  "insert_multi_tablet_enable_multithreading_column_threshold",
+                  String.valueOf(conf.getInsertMultiTabletEnableMultithreadingColumnThreshold()))));
+
       // At the same time, set TSFileConfig
       TSFileDescriptor.getInstance()
           .getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 935903c..633e8b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.auth.authorizer.IAuthorizer;
 import org.apache.iotdb.db.auth.entity.PathPrivilege;
 import org.apache.iotdb.db.auth.entity.Role;
 import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.cq.ContinuousQueryService;
@@ -144,6 +146,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils;
 import org.apache.iotdb.db.utils.UpgradeUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
@@ -178,6 +181,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.*;
 
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CANCELLED;
 import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
@@ -214,6 +218,7 @@ import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
 import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
 import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
 import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
+import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
 
 public class PlanExecutor implements IPlanExecutor {
@@ -224,7 +229,9 @@ public class PlanExecutor implements IPlanExecutor {
   // for data query
   protected IQueryRouter queryRouter;
   // for administration
-  private IAuthorizer authorizer;
+  private final IAuthorizer authorizer;
+
+  private ThreadPoolExecutor insertionPool;
 
   private static final String INSERT_MEASUREMENTS_FAILED_MESSAGE = "failed to insert measurements ";
 
@@ -1510,6 +1517,15 @@ public class PlanExecutor implements IPlanExecutor {
   @Override
   public void insertTablet(InsertMultiTabletPlan insertMultiTabletPlan)
       throws QueryProcessException {
+    if (insertMultiTabletPlan.isEnableMultiThreading()) {
+      insertTabletParallel(insertMultiTabletPlan);
+    } else {
+      insertTabletSerial(insertMultiTabletPlan);
+    }
+  }
+
+  private void insertTabletSerial(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
     for (int i = 0; i < insertMultiTabletPlan.getInsertTabletPlanList().size(); i++) {
       if (insertMultiTabletPlan.getResults().containsKey(i)
           || insertMultiTabletPlan.isExecuted(i)) {
@@ -1528,8 +1544,72 @@ public class PlanExecutor implements IPlanExecutor {
     }
   }
 
+  private void insertTabletParallel(InsertMultiTabletPlan insertMultiTabletPlan)
+      throws BatchProcessException {
+    updateInsertTabletsPool(insertMultiTabletPlan.getDifferentStorageGroupsCount());
+
+    List<InsertTabletPlan> planList = insertMultiTabletPlan.getInsertTabletPlanList();
+    List<Future<?>> futureList = new ArrayList<>();
+
+    Map<Integer, TSStatus> results = insertMultiTabletPlan.getResults();
+
+    List<InsertTabletPlan> runPlanList = new ArrayList<>();
+    Map<Integer, Integer> runIndexToRealIndex = new HashMap<>();
+    for (int i = 0; i < planList.size(); i++) {
+      if (!(results.containsKey(i) || insertMultiTabletPlan.isExecuted(i))) {
+        runPlanList.add(planList.get(i));
+        runIndexToRealIndex.put(runPlanList.size() - 1, i);
+      }
+    }
+    for (InsertTabletPlan plan : runPlanList) {
+      Future<?> f =
+          insertionPool.submit(
+              () -> {
+                insertTablet(plan);
+                return null;
+              });
+      futureList.add(f);
+    }
+    for (int i = 0; i < futureList.size(); i++) {
+      try {
+        futureList.get(i).get();
+      } catch (Exception e) {
+        if (e.getCause() instanceof QueryProcessException) {
+          QueryProcessException qe = (QueryProcessException) e.getCause();
+          results.put(
+              runIndexToRealIndex.get(i), RpcUtils.getStatus(qe.getErrorCode(), qe.getMessage()));
+        } else {
+          results.put(
+              runIndexToRealIndex.get(i),
+              RpcUtils.getStatus(INTERNAL_SERVER_ERROR, e.getMessage()));
+        }
+      }
+    }
+
+    if (!results.isEmpty()) {
+      throw new BatchProcessException(insertMultiTabletPlan.getFailingStatus());
+    }
+  }
+
+  private void updateInsertTabletsPool(int sgSize) {
+    int updateCoreSize = Math.min(sgSize, Runtime.getRuntime().availableProcessors() / 2);
+    if (insertionPool == null || insertionPool.isTerminated()) {
+      insertionPool =
+          (ThreadPoolExecutor)
+              IoTDBThreadPoolFactory.newFixedThreadPool(
+                  updateCoreSize, ThreadName.INSERTION_SERVICE.getName());
+    } else if (insertionPool.getCorePoolSize() > updateCoreSize) {
+      insertionPool.setCorePoolSize(updateCoreSize);
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+    } else if (insertionPool.getCorePoolSize() < updateCoreSize) {
+      insertionPool.setMaximumPoolSize(updateCoreSize);
+      insertionPool.setCorePoolSize(updateCoreSize);
+    }
+  }
+
   @Override
   public void insertTablet(InsertTabletPlan insertTabletPlan) throws QueryProcessException {
+
     if (insertTabletPlan.getRowCount() == 0) {
       return;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
index 07e3b85..5b6251a 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertMultiTabletPlan.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.db.qp.physical.crud;
 
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -29,11 +31,7 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.TreeMap;
+import java.util.*;
 
 /**
  * Mainly used in the distributed version, when multiple InsertTabletPlans belong to a raft
@@ -96,6 +94,10 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
 
   boolean[] isExecuted;
 
+  Boolean isEnableMultithreading;
+
+  Integer differentStorageGroupsCount;
+
   public InsertMultiTabletPlan() {
     super(OperatorType.MULTI_BATCH_INSERT);
     this.insertTabletPlanList = new ArrayList<>();
@@ -384,4 +386,56 @@ public class InsertMultiTabletPlan extends InsertPlan implements BatchPlan {
       results.remove(i);
     }
   }
+
+  public int getDifferentStorageGroupsCount() {
+    if (differentStorageGroupsCount == null) {
+      Set<String> insertPlanSGSet = new HashSet<>();
+      int defaultStorageGroupLevel = new IoTDBConfig().getDefaultStorageGroupLevel();
+      for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+        String[] nodes = insertTabletPlan.getDeviceId().getNodes();
+        StringBuilder stringBuilder = new StringBuilder();
+        for (int i = 0; i <= defaultStorageGroupLevel && i < nodes.length; i++) {
+          stringBuilder.append(nodes[i]).append(".");
+        }
+        insertPlanSGSet.add(stringBuilder.toString());
+      }
+      differentStorageGroupsCount = insertPlanSGSet.size();
+    }
+    return differentStorageGroupsCount;
+  }
+
+  public boolean isEnableMultiThreading() {
+    // If we enable multithreading, we need to consider the loss of switching between threads,
+    // so we need to judge the core threads of the thread pool and the size of the tablet.
+
+    // Therefore, we set the number of core threads in the thread pool to
+    // min(the number of different sg, availableProcessors()/2),
+    // and need columns >= insertMultiTabletEnableMultithreadingColumnThreshold.
+
+    // It should be noted that if the number of sg is large and exceeds twice of the recommended
+    // number of CPU threads,
+    // it may lead to failure to allocate out of heap memory and NPE.
+    // Therefore, we will also turn off multithreading in this case.
+    if (isEnableMultithreading == null) {
+      int sgSize = getDifferentStorageGroupsCount();
+      // SG should be >= 1 so that it will not be locked and degenerate into serial.
+      // SG should be <= Runtime.getRuntime().availableProcessors()*2  so that to avoid failure to
+      // allocate out of heap memory and NPE
+      if (sgSize <= 1 || sgSize >= Runtime.getRuntime().availableProcessors() * 2) {
+        isEnableMultithreading = false;
+      } else {
+        int count = 0;
+        for (InsertTabletPlan insertTabletPlan : insertTabletPlanList) {
+          if (insertTabletPlan.getColumns().length
+              >= IoTDBDescriptor.getInstance()
+                  .getConfig()
+                  .getInsertMultiTabletEnableMultithreadingColumnThreshold()) {
+            count++;
+          }
+        }
+        isEnableMultithreading = count * 2 >= insertTabletPlanList.size();
+      }
+    }
+    return isEnableMultithreading;
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index dff65b4..062bbc3 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -101,4 +101,98 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
       Assert.assertEquals(60, record.getFields().size());
     }
   }
+
+  @Test
+  public void testInsertMultiTabletPlanParallel()
+      throws QueryProcessException, MetadataException, StorageEngineException, IOException,
+          InterruptedException, QueryFilterOptimizationException {
+    long[] times =
+        new long[] {
+          110L, 111L, 112L, 113L, 110L, 111L, 112L, 113L, 110L, 111L, 112L, 113L, 110L, 111L, 112L,
+        };
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[16];
+    int size = (times).length;
+    columns[0] = new double[size];
+    columns[1] = new float[size];
+    columns[2] = new long[size];
+    columns[3] = new int[size];
+    columns[4] = new boolean[size];
+    columns[5] = new Binary[size];
+    columns[6] = new Binary[size];
+    columns[7] = new Binary[size];
+    columns[8] = new Binary[size];
+    columns[9] = new Binary[size];
+    columns[10] = new Binary[size];
+    columns[11] = new Binary[size];
+    columns[12] = new Binary[size];
+    columns[13] = new Binary[size];
+    columns[14] = new Binary[size];
+    columns[15] = new Binary[size];
+
+    for (int r = 0; r < size; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[6])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[7])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[8])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[9])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[10])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[11])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[12])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[13])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[14])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[15])[r] = new Binary("hh" + r);
+    }
+
+    List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      InsertTabletPlan tabletPlan =
+          new InsertTabletPlan(
+              new PartialPath("root.multi" + i / 5 + ".d" + i),
+              new String[] {
+                "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13",
+                "s14", "s15", "s16"
+              },
+              dataTypes);
+      tabletPlan.setTimes(times);
+      tabletPlan.setColumns(columns);
+      tabletPlan.setRowCount(times.length);
+      insertTabletPlanList.add(tabletPlan);
+    }
+    PlanExecutor executor = new PlanExecutor();
+
+    InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+    Assert.assertTrue(insertMultiTabletPlan.isEnableMultiThreading());
+    executor.insertTablet(insertMultiTabletPlan);
+
+    QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.**");
+    QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+    Assert.assertEquals(160, dataSet.getPaths().size());
+    while (dataSet.hasNext()) {
+      RowRecord record = dataSet.next();
+      Assert.assertEquals(160, record.getFields().size());
+    }
+  }
 }