You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/25 03:36:16 UTC

[iotdb] 01/01: change the confid

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch WalBufferPoolConfig
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e2f1fcc9ab2c0e84d9038cb526a59cc44be4d706
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 25 11:35:40 2021 +0800

    change the confid
---
 .../resources/conf/iotdb-engine.properties         |  9 +++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 +++++++--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  | 28 ++++++--
 .../engine/storagegroup/StorageGroupProcessor.java | 82 +++++++++++-----------
 4 files changed, 95 insertions(+), 56 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 81e209e..6af92de 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -414,6 +414,15 @@ enable_stat_monitor=false
 enable_monitor_series_write=false
 
 ####################
+### WAL Direct Buffer Pool Configuration
+####################
+# the interval to trim the wal pool
+wal_pool_trim_interval_ms=10000
+
+# the max number of wal bytebuffer can be allocated for each time partition, if there is no unseq data you can set it to 4.
+max_wal_bytebuffer_num_for_each_partition=8
+
+####################
 ### External sort Configuration
 ####################
 # Is external sort enable
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0c98992..1e5ccfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -18,6 +18,11 @@
  */
 package org.apache.iotdb.db.conf;
 
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
@@ -31,16 +36,9 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSType;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
 public class IoTDBConfig {
 
   /* Names of Watermark methods */
@@ -167,6 +165,10 @@ public class IoTDBConfig {
    */
   private int walBufferSize = 16 * 1024 * 1024;
 
+  private int maxWalBytebufferNumForEachPartition = 8;
+
+  private long walPoolTrimIntervalInMS = 10_000;
+
   private int estimatedSeriesSize = 300;
 
   /**
@@ -1144,6 +1146,22 @@ public class IoTDBConfig {
     this.walBufferSize = walBufferSize;
   }
 
+  public int getMaxWalBytebufferNumForEachPartition() {
+    return maxWalBytebufferNumForEachPartition;
+  }
+
+  public void setMaxWalBytebufferNumForEachPartition(int maxWalBytebufferNumForEachPartition) {
+    this.maxWalBytebufferNumForEachPartition = maxWalBytebufferNumForEachPartition;
+  }
+
+  public long getWalPoolTrimIntervalInMS() {
+    return walPoolTrimIntervalInMS;
+  }
+
+  public void setWalPoolTrimIntervalInMS(long walPoolTrimIntervalInMS) {
+    this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS;
+  }
+
   public int getEstimatedSeriesSize() {
     return estimatedSeriesSize;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f26a1b5..dfadc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -260,13 +260,6 @@ public class IoTDBDescriptor {
 
       conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
 
-      int walBufferSize =
-          Integer.parseInt(
-              properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
-      if (walBufferSize > 0) {
-        conf.setWalBufferSize(walBufferSize);
-      }
-
       int mlogBufferSize =
           Integer.parseInt(
               properties.getProperty(
@@ -819,6 +812,27 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "enable_discard_out_of_order_data",
                 Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+
+    int walBufferSize =
+        Integer.parseInt(
+            properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
+    if (walBufferSize > 0) {
+      conf.setWalBufferSize(walBufferSize);
+    }
+
+    int maxWalBytebufferNumForEachPartition =
+        Integer.parseInt(
+            properties.getProperty("max_wal_bytebuffer_num_for_each_partition", Integer.toString(conf.getMaxWalBytebufferNumForEachPartition())));
+    if (maxWalBytebufferNumForEachPartition > 0) {
+      conf.setMaxWalBytebufferNumForEachPartition(maxWalBytebufferNumForEachPartition);
+    }
+
+    long poolTrimIntervalInMS =
+        Integer.parseInt(
+            properties.getProperty("wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS())));
+    if (poolTrimIntervalInMS > 0) {
+      conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS);
+    }
   }
 
   private void loadAutoCreateSchemaProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 61b692c..edd1c42 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,38 @@
  */
 package org.apache.iotdb.db.engine.storagegroup;
 
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
 /**
  * For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
  * TsFileProcessor in the working status. <br>
@@ -254,11 +252,6 @@ public class StorageGroupProcessor {
   private static final int WAL_BUFFER_SIZE =
       IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
 
-  private static final int MAX_WAL_BYTEBUFFER_NUM =
-      IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4;
-
-  private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000;
-
   private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>();
 
   private int currentWalPoolSize = 0;
@@ -277,6 +270,9 @@ public class StorageGroupProcessor {
   public ByteBuffer[] getWalDirectByteBuffer() {
     ByteBuffer[] res = new ByteBuffer[2];
     synchronized (walByteBufferPool) {
+      long startTime = System.nanoTime();
+      int MAX_WAL_BYTEBUFFER_NUM =
+          config.getConcurrentWritingTimePartition() * config.getMaxWalBytebufferNumForEachPartition();
       while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) {
         try {
           walByteBufferPool.wait();
@@ -288,6 +284,8 @@ public class StorageGroupProcessor {
               virtualStorageGroupId,
               e);
         }
+        logger.info("Waiting {} ms for wal direct byte buffer.",
+            (System.nanoTime() - startTime) / 1_000_000);
       }
       // If the queue is not empty, it must have at least two.
       if (!walByteBufferPool.isEmpty()) {
@@ -337,7 +335,7 @@ public class StorageGroupProcessor {
       // we will trim the size to expectedSize until the pool is empty
       while (expectedSize < currentWalPoolSize
           && !walByteBufferPool.isEmpty()
-          && poolNotEmptyIntervalInMS >= DEFAULT_POOL_TRIM_INTERVAL_MILLIS) {
+          && poolNotEmptyIntervalInMS >= config.getWalPoolTrimIntervalInMS()) {
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
         currentWalPoolSize -= 2;
@@ -380,8 +378,8 @@ public class StorageGroupProcessor {
     ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
     executorService.scheduleWithFixedDelay(
         this::trimTask,
-        DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
-        DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
+        config.getWalPoolTrimIntervalInMS(),
+        config.getWalPoolTrimIntervalInMS(),
         TimeUnit.MILLISECONDS);
     recover();
   }