You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/02/25 03:36:16 UTC
[iotdb] 01/01: change the confid
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch WalBufferPoolConfig
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e2f1fcc9ab2c0e84d9038cb526a59cc44be4d706
Author: JackieTien97 <Ja...@foxmail.com>
AuthorDate: Thu Feb 25 11:35:40 2021 +0800
change the confid
---
.../resources/conf/iotdb-engine.properties | 9 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 32 +++++++--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 28 ++++++--
.../engine/storagegroup/StorageGroupProcessor.java | 82 +++++++++++-----------
4 files changed, 95 insertions(+), 56 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 81e209e..6af92de 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -414,6 +414,15 @@ enable_stat_monitor=false
enable_monitor_series_write=false
####################
+### WAL Direct Buffer Pool Configuration
+####################
+# the interval to trim the wal pool
+wal_pool_trim_interval_ms=10000
+
+# the max number of wal bytebuffer can be allocated for each time partition, if there is no unseq data you can set it to 4.
+max_wal_bytebuffer_num_for_each_partition=8
+
+####################
### External sort Configuration
####################
# Is external sort enable
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 0c98992..1e5ccfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -18,6 +18,11 @@
*/
package org.apache.iotdb.db.conf;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+import java.io.File;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.merge.selector.MergeFileStrategy;
@@ -31,16 +36,9 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSType;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
-
public class IoTDBConfig {
/* Names of Watermark methods */
@@ -167,6 +165,10 @@ public class IoTDBConfig {
*/
private int walBufferSize = 16 * 1024 * 1024;
+ private int maxWalBytebufferNumForEachPartition = 8;
+
+ private long walPoolTrimIntervalInMS = 10_000;
+
private int estimatedSeriesSize = 300;
/**
@@ -1144,6 +1146,22 @@ public class IoTDBConfig {
this.walBufferSize = walBufferSize;
}
+ public int getMaxWalBytebufferNumForEachPartition() {
+ return maxWalBytebufferNumForEachPartition;
+ }
+
+ public void setMaxWalBytebufferNumForEachPartition(int maxWalBytebufferNumForEachPartition) {
+ this.maxWalBytebufferNumForEachPartition = maxWalBytebufferNumForEachPartition;
+ }
+
+ public long getWalPoolTrimIntervalInMS() {
+ return walPoolTrimIntervalInMS;
+ }
+
+ public void setWalPoolTrimIntervalInMS(long walPoolTrimIntervalInMS) {
+ this.walPoolTrimIntervalInMS = walPoolTrimIntervalInMS;
+ }
+
public int getEstimatedSeriesSize() {
return estimatedSeriesSize;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index f26a1b5..dfadc8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -260,13 +260,6 @@ public class IoTDBDescriptor {
conf.setWalDir(properties.getProperty("wal_dir", conf.getWalDir()));
- int walBufferSize =
- Integer.parseInt(
- properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
- if (walBufferSize > 0) {
- conf.setWalBufferSize(walBufferSize);
- }
-
int mlogBufferSize =
Integer.parseInt(
properties.getProperty(
@@ -819,6 +812,27 @@ public class IoTDBDescriptor {
properties.getProperty(
"enable_discard_out_of_order_data",
Boolean.toString(conf.isEnableDiscardOutOfOrderData()))));
+
+ int walBufferSize =
+ Integer.parseInt(
+ properties.getProperty("wal_buffer_size", Integer.toString(conf.getWalBufferSize())));
+ if (walBufferSize > 0) {
+ conf.setWalBufferSize(walBufferSize);
+ }
+
+ int maxWalBytebufferNumForEachPartition =
+ Integer.parseInt(
+ properties.getProperty("max_wal_bytebuffer_num_for_each_partition", Integer.toString(conf.getMaxWalBytebufferNumForEachPartition())));
+ if (maxWalBytebufferNumForEachPartition > 0) {
+ conf.setMaxWalBytebufferNumForEachPartition(maxWalBytebufferNumForEachPartition);
+ }
+
+ long poolTrimIntervalInMS =
+ Integer.parseInt(
+ properties.getProperty("wal_pool_trim_interval_ms", Long.toString(conf.getWalPoolTrimIntervalInMS())));
+ if (poolTrimIntervalInMS > 0) {
+ conf.setWalPoolTrimIntervalInMS(poolTrimIntervalInMS);
+ }
}
private void loadAutoCreateSchemaProps(Properties properties) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 61b692c..edd1c42 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -18,6 +18,38 @@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -77,43 +109,9 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-
-import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
-import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
-import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
-
/**
* For sequence data, a StorageGroupProcessor has some TsFileProcessors, in which there is only one
* TsFileProcessor in the working status. <br>
@@ -254,11 +252,6 @@ public class StorageGroupProcessor {
private static final int WAL_BUFFER_SIZE =
IoTDBDescriptor.getInstance().getConfig().getWalBufferSize() / 2;
- private static final int MAX_WAL_BYTEBUFFER_NUM =
- IoTDBDescriptor.getInstance().getConfig().getConcurrentWritingTimePartition() * 4;
-
- private static final long DEFAULT_POOL_TRIM_INTERVAL_MILLIS = 10_000;
-
private final Deque<ByteBuffer> walByteBufferPool = new LinkedList<>();
private int currentWalPoolSize = 0;
@@ -277,6 +270,9 @@ public class StorageGroupProcessor {
public ByteBuffer[] getWalDirectByteBuffer() {
ByteBuffer[] res = new ByteBuffer[2];
synchronized (walByteBufferPool) {
+ long startTime = System.nanoTime();
+ int MAX_WAL_BYTEBUFFER_NUM =
+ config.getConcurrentWritingTimePartition() * config.getMaxWalBytebufferNumForEachPartition();
while (walByteBufferPool.isEmpty() && currentWalPoolSize + 2 > MAX_WAL_BYTEBUFFER_NUM) {
try {
walByteBufferPool.wait();
@@ -288,6 +284,8 @@ public class StorageGroupProcessor {
virtualStorageGroupId,
e);
}
+ logger.info("Waiting {} ms for wal direct byte buffer.",
+ (System.nanoTime() - startTime) / 1_000_000);
}
// If the queue is not empty, it must have at least two.
if (!walByteBufferPool.isEmpty()) {
@@ -337,7 +335,7 @@ public class StorageGroupProcessor {
// we will trim the size to expectedSize until the pool is empty
while (expectedSize < currentWalPoolSize
&& !walByteBufferPool.isEmpty()
- && poolNotEmptyIntervalInMS >= DEFAULT_POOL_TRIM_INTERVAL_MILLIS) {
+ && poolNotEmptyIntervalInMS >= config.getWalPoolTrimIntervalInMS()) {
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
MmapUtil.clean((MappedByteBuffer) walByteBufferPool.removeLast());
currentWalPoolSize -= 2;
@@ -380,8 +378,8 @@ public class StorageGroupProcessor {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(
this::trimTask,
- DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
- DEFAULT_POOL_TRIM_INTERVAL_MILLIS,
+ config.getWalPoolTrimIntervalInMS(),
+ config.getWalPoolTrimIntervalInMS(),
TimeUnit.MILLISECONDS);
recover();
}