You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/25 03:36:15 UTC

[iotdb] branch WalBufferPoolConfig created (now e2f1fcc)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a change to branch WalBufferPoolConfig
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at e2f1fcc  change the confid

This branch includes the following new commits:

     new e2f1fcc  change the confid

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: change the confid

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch WalBufferPoolConfig
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2f1fcc9ab2c0e84d9038cb526a59cc44be4d706
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 25 11:35:40 2021 +0800

    change the confid
---
 .../resources/conf/iotdb-engine.properties         |  9 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 +++++++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 28 ++++++--
 .../engine/storagegroup/StorageGroupProcessor.java | 82 +++++++++++-----------
 4 files changed, 95 insertions(+), 56 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 81e209e..6af92de 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -414,6 +414,15 @@ enable_stat_monitor=false
 enable_monitor_series_write=false
 
 ####################
+### WAL Direct Buffer Pool Configuration
+####################
+# the interval to trim the wal pool
+wal_pool_trim_interval_ms=10000
+
+# the max number of wal bytebuffer can be allocated for each time partition, if there is no unseq data you can set it to 4.
+max_wal_bytebuffer_num_for_each_partition=8
+
+####################
 ### External sort Configuration
 ####################
 # Is external sort enable
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0c98992..1e5ccfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -18,6 +18,11 @@
  */
 package org.apache.iotdb.db.conf;
 
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
@@ -31,16 +36,9 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSType;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
 public class IoTDBConfig {
 
   /* Names of Watermark methods */
@@ -167,6 +165,10 @@ public class IoTDBConfig {
    */
   private int walBufferSize = 16 * 1024 * 1024;
 
+  private int maxWalBytebufferNumForEachPartition = 8;
+
+  private long walPoolTrimIntervalInMS = 10_000;
+
   private int estimatedSeriesSize = 300;
 
   /**
@@ -1144,6 +1146,22 @@ public class IoTDBConfig {
     this.walBufferSize = walBufferSize;
   }
 
+  public int getMaxWalBytebufferNumForEachPartition() {
+    return maxWalBytebufferNumForEachPartition;
+  }
+
+  public void setMaxWalBytebufferNumForEachPartition(int maxWalBytebufferNumForEachPartition) {
+    this.maxWalBytebufferNumForEachPartition = maxWalBytebufferNumForEachPartition;
+  }
+
+  public long getWalPoolTrimIntervalInMS() {
+    return walPoolTrimIntervalInMS;
+  }
+
+  public void setWalPoolTrimIntervalInMS(long walPoolTrimIntervalInMS) {
+    this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS;
+  }
+
   public int getEstimatedSeriesSize() {
     return estimatedSeriesSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f26a1b5..dfadc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -260,13 +260,6 @@ public class IoTDBDescriptor {
 
       conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
 
-      int walBufferSize =
-          Integer.parseInt(
-              properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
-      if (walBufferSize > 0) {
-        conf.setWalBufferSize(walBufferSize);
-      }
-
       int mlogBufferSize =
           Integer.parseInt(
               properties.getProperty(
@@ -819,6 +812,27 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "enable_discard_out_of_order_data",
                 Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+
+    int walBufferSize =
+        Integer.parseInt(
+            properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
+    if (walBufferSize > 0) {
+      conf.setWalBufferSize(walBufferSize);
+    }
+
+    int maxWalBytebufferNumForEachPartition =
+        Integer.parseInt(
+            properties.getProperty("max_wal_bytebuffer_num_for_each_partition", Integer.toString(conf.getMaxWalBytebufferNumForEachPartition())));
+    if (maxWalBytebufferNumForEachPartition > 0) {
+      conf.setMaxWalBytebufferNumForEachPartition(maxWalBytebufferNumForEachPartition);
+    }
+
+    long poolTrimIntervalInMS =
+        Integer.parseInt(
+            properties.getProperty("wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS())));
+    if (poolTrimIntervalInMS > 0) {
+      conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS);
+    }
   }
 
   private void loadAutoCreateSchemaProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 61b692c..edd1c42 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,38 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
  * TsFileProcessor in the working status. <br>
@@ -254,11 +252,6 @@ public class StorageGroupProcessor {
   private static final int WAL_BUFFER_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
 
-  private static final int MAX_WAL_BYTEBUFFER_NUM =
-      IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4;
-
-  private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000;
-
   private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>();
 
   private int currentWalPoolSize = 0;
@@ -277,6 +270,9 @@ public class StorageGroupProcessor {
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
     synchronized (walByteBufferPool) {
+      long startTime = System.nanoTime();
+      int MAX_WAL_BYTEBUFFER_NUM =
+          config.getConcurrentWritingTimePartition() * config.getMaxWalBytebufferNumForEachPartition();
       while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) {
         try {
           walByteBufferPool.wait();
@@ -288,6 +284,8 @@ public class StorageGroupProcessor {
               virtualStorageGroupId,
               e);
         }
+        logger.info("Waiting {} ms for wal direct byte buffer.",
+            (System.nanoTime() - startTime) / 1_000_000);
       }
       // If the queue is not empty, it must have at least two.
       if (!walByteBufferPool.isEmpty()) {
@@ -337,7 +335,7 @@ public class StorageGroupProcessor {
       // we will trim the size to expectedSize until the pool is empty
       while (expectedSize < currentWalPoolSize
           && !walByteBufferPool.isEmpty()
-          && poolNotEmptyIntervalInMS >= DEFAULT_POOL_TRIM_INTERVAL_MILLIS) {
+          && poolNotEmptyIntervalInMS >= config.getWalPoolTrimIntervalInMS()) {
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         currentWalPoolSize -= 2;
@@ -380,8 +378,8 @@ public class StorageGroupProcessor {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     executorService.scheduleWithFixedDelay(
         this::trimTask,
-        DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
-        DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
+        config.getWalPoolTrimIntervalInMS(),
+        config.getWalPoolTrimIntervalInMS(),
         TimeUnit.MILLISECONDS);
     recover();
   }