You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/01/02 06:00:29 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new d4cab76  [To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)
d4cab76 is described below

commit d4cab763e07df3717705e6f22006c93ad8649b2f
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Sun Jan 2 13:59:54 2022 +0800

    [To rel/0.12][IOTDB-1297] Refactor the memory control when enabling time partitions (#4659)
---
 .../resources/conf/iotdb-engine.properties         |  8 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 23 ++++-
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 20 ++++-
 .../engine/storagegroup/StorageGroupProcessor.java | 25 ++++--
 .../writelog/manager/MultiFileLogNodeManager.java  | 38 ++++++++-
 .../db/qp/physical/InsertTabletMultiPlanTest.java  | 99 ++++++++++++++++++++++
 6 files changed, 202 insertions(+), 11 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 8e63e07..28a4f65 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -490,6 +490,12 @@ timestamp_precision=ms
 # it should be an even number
 # max_wal_bytebuffer_num_for_each_partition=6
 
+# if OOM occurs when registering bytebuffer, system will sleep awhile and then try again.
+# register_buffer_sleep_interval_in_ms=200
+
+# if total sleep time exceeds this, system will reject this write.
+# register_buffer_reject_threshold_in_ms=10000
+
 ####################
 ### External sort Configuration
 ####################
@@ -727,7 +733,7 @@ timestamp_precision=ms
 # time range for partitioning data inside each storage group, the unit is second
 # partition_interval=604800
 
-# concurrent_writing_time_partition=500
+# concurrent_writing_time_partition=1
 
 # admin username, default is root
 # Datatype: string
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 756b88d..b880ae5 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -172,6 +172,11 @@ public class IoTDBConfig {
 
   private long walPoolTrimIntervalInMS = 10_000;
 
+  /** if OOM occurs when registering bytebuffer, system will sleep awhile and then try again. */
+  private long registerBufferSleepIntervalInMs = 200;
+  /** if total sleep time exceeds this, system will reject this write. */
+  private long registerBufferRejectThresholdInMs = 10_000;
+
   private int estimatedSeriesSize = 300;
 
   /**
@@ -589,7 +594,7 @@ public class IoTDBConfig {
   private String kerberosPrincipal = "your principal";
 
   /** the num of memtable in each storage group */
-  private int concurrentWritingTimePartition = 500;
+  private int concurrentWritingTimePartition = 1;
 
   /** the default fill interval in LinearFill and PreviousFill, -1 means infinite past time */
   private int defaultFillInterval = -1;
@@ -1248,6 +1253,22 @@ public class IoTDBConfig {
     this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS;
   }
 
+  public long getRegisterBufferSleepIntervalInMs() {
+    return registerBufferSleepIntervalInMs;
+  }
+
+  public void setRegisterBufferSleepIntervalInMs(long registerBufferSleepIntervalInMs) {
+    this.registerBufferSleepIntervalInMs = registerBufferSleepIntervalInMs;
+  }
+
+  public long getRegisterBufferRejectThresholdInMs() {
+    return registerBufferRejectThresholdInMs;
+  }
+
+  public void setRegisterBufferRejectThresholdInMs(long registerBufferRejectThresholdInMs) {
+    this.registerBufferRejectThresholdInMs = registerBufferRejectThresholdInMs;
+  }
+
   public int getEstimatedSeriesSize() {
     return estimatedSeriesSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 8df773d..a2c6d3d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -833,12 +833,30 @@ public class IoTDBDescriptor {
     }
 
     long poolTrimIntervalInMS =
-        Integer.parseInt(
+        Long.parseLong(
             properties.getProperty(
                 "wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS())));
     if (poolTrimIntervalInMS > 0) {
       conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS);
     }
+
+    long registerBufferSleepIntervalInMs =
+        Long.parseLong(
+            properties.getProperty(
+                "register_buffer_sleep_interval_in_ms",
+                Long.toString(conf.getRegisterBufferSleepIntervalInMs())));
+    if (registerBufferSleepIntervalInMs > 0) {
+      conf.setRegisterBufferSleepIntervalInMs(registerBufferSleepIntervalInMs);
+    }
+
+    long registerBufferRejectThresholdInMs =
+        Long.parseLong(
+            properties.getProperty(
+                "register_buffer_reject_threshold_in_ms",
+                Long.toString(conf.getRegisterBufferRejectThresholdInMs())));
+    if (registerBufferRejectThresholdInMs > 0) {
+      conf.setRegisterBufferRejectThresholdInMs(registerBufferRejectThresholdInMs);
+    }
   }
 
   private void loadAutoCreateSchemaProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 42d20ed..e44ce60 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -268,7 +268,10 @@ public class StorageGroupProcessor {
 
   private volatile boolean compacting = false;
 
-  /** get the direct byte buffer from pool, each fetch contains two ByteBuffer */
+  /**
+   * get the direct byte buffer from pool, each fetch contains two ByteBuffer, return null if fetch
+   * fails
+   */
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
     synchronized (walByteBufferPool) {
@@ -298,9 +301,21 @@ public class StorageGroupProcessor {
       } else {
         // if the queue is empty and current size is less than MAX_BYTEBUFFER_NUM
         // we can construct another two more new byte buffer
-        currentWalPoolSize += 2;
-        res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
-        res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+        try {
+          currentWalPoolSize += 2;
+          res[0] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+          res[1] = ByteBuffer.allocateDirect(WAL_BUFFER_SIZE);
+        } catch (OutOfMemoryError e) {
+          if (res[0] != null) {
+            MmapUtil.clean((MappedByteBuffer) res[0]);
+            currentWalPoolSize -= 1;
+          }
+          if (res[1] != null) {
+            MmapUtil.clean((MappedByteBuffer) res[1]);
+            currentWalPoolSize -= 1;
+          }
+          return null;
+        }
       }
       // if the pool is empty, set the time back to MAX_VALUE
       if (walByteBufferPool.isEmpty()) {
@@ -1306,7 +1321,7 @@ public class StorageGroupProcessor {
     synchronized (walByteBufferPool) {
       while (!walByteBufferPool.isEmpty()) {
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeFirst());
-        currentWalPoolSize--;
+        currentWalPoolSize -= 1;
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
index a9283b1..ddaf132 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/manager/MultiFileLogNodeManager.java
@@ -46,10 +46,17 @@ import java.util.function.Supplier;
 public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
 
   private static final Logger logger = LoggerFactory.getLogger(MultiFileLogNodeManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  // if OOM occurs when registering bytebuffer, getNode method will sleep awhile and then try again
+  private static final long REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS =
+      config.getRegisterBufferSleepIntervalInMs();
+  // if total sleep time exceeds this, getNode method will reject this write
+  private static final long REGISTER_BUFFER_REJECT_THRESHOLD_IN_MS =
+      config.getRegisterBufferRejectThresholdInMs();
+
   private final Map<String, WriteLogNode> nodeMap;
 
   private ScheduledExecutorService executorService;
-  private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private boolean firstReadOnly = true;
 
   private void forceTask() {
@@ -90,9 +97,34 @@ public class MultiFileLogNodeManager implements WriteLogNodeManager, IService {
       node = new ExclusiveWriteLogNode(identifier);
       WriteLogNode oldNode = nodeMap.putIfAbsent(identifier, node);
       if (oldNode != null) {
-        return oldNode;
+        node = oldNode;
       } else {
-        node.initBuffer(supplier.get());
+        ByteBuffer[] buffers = supplier.get();
+        int sleepTimeInMs = 0;
+        while (buffers == null) {
+          // log error if this is the first time
+          if (sleepTimeInMs == 0) {
+            logger.error(
+                "Cannot allocate bytebuffer for wal, please reduce wal_buffer_size or storage groups number");
+          }
+          // sleep awhile and then try again
+          try {
+            Thread.sleep(REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS);
+            sleepTimeInMs += REGISTER_BUFFER_SLEEP_INTERVAL_IN_MS;
+          } catch (InterruptedException e) {
+            nodeMap.remove(identifier);
+          }
+          // sleep too long, throw exception
+          if (sleepTimeInMs >= REGISTER_BUFFER_REJECT_THRESHOLD_IN_MS) {
+            nodeMap.remove(identifier);
+            throw new RuntimeException(
+                "Cannot allocate bytebuffer for wal, please reduce wal_buffer_size or storage groups number");
+          }
+          // try to get bytebuffer repeatedly
+          buffers = supplier.get();
+        }
+        // initialize node with bytebuffers
+        node.initBuffer(buffers);
       }
     }
     return node;
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
index aaa9c0f..d782a77 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletMultiPlanTest.java
@@ -100,4 +100,103 @@ public class InsertTabletMultiPlanTest extends InsertTabletPlanTest {
       Assert.assertEquals(60, record.getFields().size());
     }
   }
+
+  @Test
+  public void testHugeInsertMultiTabletPlan()
+      throws QueryProcessException, MetadataException, StorageEngineException, IOException,
+          InterruptedException, QueryFilterOptimizationException {
+    // run this test case, can throw write_process_npe
+    long[] times = new long[10000];
+    for (int i = 0; i < times.length; i++) {
+      times[i] = i;
+    }
+    List<Integer> dataTypes = new ArrayList<>();
+    dataTypes.add(TSDataType.DOUBLE.ordinal());
+    dataTypes.add(TSDataType.FLOAT.ordinal());
+    dataTypes.add(TSDataType.INT64.ordinal());
+    dataTypes.add(TSDataType.INT32.ordinal());
+    dataTypes.add(TSDataType.BOOLEAN.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+    dataTypes.add(TSDataType.TEXT.ordinal());
+
+    Object[] columns = new Object[16];
+    int size = (times).length;
+    columns[0] = new double[size];
+    columns[1] = new float[size];
+    columns[2] = new long[size];
+    columns[3] = new int[size];
+    columns[4] = new boolean[size];
+    columns[5] = new Binary[size];
+    columns[6] = new Binary[size];
+    columns[7] = new Binary[size];
+    columns[8] = new Binary[size];
+    columns[9] = new Binary[size];
+    columns[10] = new Binary[size];
+    columns[11] = new Binary[size];
+    columns[12] = new Binary[size];
+    columns[13] = new Binary[size];
+    columns[14] = new Binary[size];
+    columns[15] = new Binary[size];
+
+    for (int r = 0; r < size; r++) {
+      ((double[]) columns[0])[r] = 1.0;
+      ((float[]) columns[1])[r] = 2;
+      ((long[]) columns[2])[r] = 10000;
+      ((int[]) columns[3])[r] = 100;
+      ((boolean[]) columns[4])[r] = false;
+      ((Binary[]) columns[5])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[6])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[7])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[8])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[9])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[10])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[11])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[12])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[13])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[14])[r] = new Binary("hh" + r);
+      ((Binary[]) columns[15])[r] = new Binary("hh" + r);
+    }
+
+    List<InsertTabletPlan> insertTabletPlanList = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) {
+      InsertTabletPlan tabletPlan =
+          new InsertTabletPlan(
+              new PartialPath("root.multi" + i / 20 + ".d" + i),
+              new String[] {
+                "s1", "s2", "s3", "s4", "s5", "s6", "s7", "s8", "s9", "s10", "s11", "s12", "s13",
+                "s14", "s15", "s16"
+              },
+              dataTypes);
+      tabletPlan.setTimes(times);
+      tabletPlan.setColumns(columns);
+      tabletPlan.setRowCount(times.length);
+      insertTabletPlanList.add(tabletPlan);
+    }
+    PlanExecutor executor = new PlanExecutor();
+
+    InsertMultiTabletPlan insertMultiTabletPlan = new InsertMultiTabletPlan(insertTabletPlanList);
+
+    executor.insertTablet(insertMultiTabletPlan);
+
+    for (int i = 0; i < 1000; i++) {
+      QueryPlan queryPlan =
+          (QueryPlan)
+              processor.parseSQLToPhysicalPlan("select * from root.multi" + i / 20 + ".d" + i);
+      QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+      Assert.assertEquals(16, dataSet.getPaths().size());
+      while (dataSet.hasNext()) {
+        RowRecord record = dataSet.next();
+        Assert.assertEquals(16, record.getFields().size());
+      }
+    }
+  }
 }