You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/12 02:05:50 UTC
[incubator-iotdb] branch dev_merge updated: add merge configs
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new d47cfa5 add merge configs
d47cfa5 is described below
commit d47cfa56bbe5aa4b414352c08759427b237b91a3
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jul 12 10:03:32 2019 +0800
add merge configs
---
iotdb/iotdb/conf/iotdb-engine.properties | 27 +++++++++++++++++++++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +++++-----
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 9 ++++++++
.../apache/iotdb/db/engine/merge/MergeManager.java | 26 +++++++++++++--------
.../engine/storagegroup/StorageGroupProcessor.java | 2 +-
5 files changed, 59 insertions(+), 17 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index 0466025..4001bf2 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -129,7 +129,32 @@ concurrent_flush_thread=0
# whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
# (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+####################
+### Merge Configurations
+####################
+
+# How many thread will be set up to perform merges, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default.
+# This is only a rough estimation, starting from a relatively small value to avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system rebooting, such merges will
+# be continued, otherwise, the unfinished part of such merges will not be restarted while the
+# finished part still remains as it is.
+# If you are feeling the rebooting is too slow, set this to false, true by default
+continue_merge_after_reboot=true
+
+# A global merge will be performed each such interval, that is, each storage group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 2hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=7200
####################
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e25809c..79e1c25 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -182,9 +182,9 @@ public class IoTDBConfig {
private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
- private int mergeThreadNum = 2;
+ private int mergeThreadNum = 1;
- private boolean mergeAfterReboot = true;
+ private boolean continueMergeAfterReboot = true;
private long mergeIntervalSec = 2 * 3600L;
@@ -512,12 +512,12 @@ public class IoTDBConfig {
this.mergeThreadNum = mergeThreadNum;
}
- public boolean isMergeAfterReboot() {
- return mergeAfterReboot;
+ public boolean isContinueMergeAfterReboot() {
+ return continueMergeAfterReboot;
}
- public void setMergeAfterReboot(boolean mergeAfterReboot) {
- this.mergeAfterReboot = mergeAfterReboot;
+ public void setContinueMergeAfterReboot(boolean continueMergeAfterReboot) {
+ this.continueMergeAfterReboot = continueMergeAfterReboot;
}
public long getMergeIntervalSec() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57e941e..589a2f7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -200,6 +200,15 @@ public class IoTDBDescriptor {
conf.setZoneID(ZoneId.of(tmpTimeZone.trim()));
logger.info("Time zone has been set to {}", conf.getZoneID());
+ conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
+ Long.toString(conf.getMergeMemoryBudget()))));
+ conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",
+ Integer.toString(conf.getMergeThreadNum()))));
+ conf.setContinueMergeAfterReboot(Boolean.parseBoolean(properties.getProperty(
+ "continue_merge_after_reboot", Boolean.toString(conf.isContinueMergeAfterReboot()))));
+ conf.setMergeIntervalSec(Long.parseLong(properties.getProperty("merge_interval_sec",
+ Long.toString(conf.getMergeIntervalSec()))));
+
} catch (IOException e) {
logger.warn("Cannot load config file because, use default configuration", e);
} catch (Exception e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
index 0ab24b9..4e9a949 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@ -37,7 +37,7 @@ public class MergeManager implements IService {
private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
private static final MergeManager INSTANCE = new MergeManager();
- private AtomicInteger threadNum = new AtomicInteger();
+ private AtomicInteger threadCnt = new AtomicInteger();
private ThreadPoolExecutor mergeTaskPool;
private ScheduledExecutorService timedMergeThreadPool;
@@ -56,13 +56,19 @@ public class MergeManager implements IService {
@Override
public void start() {
if (mergeTaskPool == null) {
+ int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads();
+ if (threadNum <= 0) {
+ threadNum = 1;
+ }
mergeTaskPool =
- (ThreadPoolExecutor) Executors.newFixedThreadPool(
- IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads(),
- r -> new Thread(r, "mergeThread-" + threadNum.getAndIncrement()));
- timedMergeThreadPool = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
- timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
- IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec(), TimeUnit.SECONDS);
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
+ r -> new Thread(r, "mergeThread-" + threadCnt.getAndIncrement()));
+ long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
+ if (mergeInterval > 0) {
+ timedMergeThreadPool = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
+ timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
+ mergeInterval, TimeUnit.SECONDS);
+ }
}
logger.info("MergeManager started");
}
@@ -70,14 +76,16 @@ public class MergeManager implements IService {
@Override
public void stop() {
if (mergeTaskPool != null) {
- timedMergeThreadPool.shutdownNow();
+ if (timedMergeThreadPool != null) {
+ timedMergeThreadPool.shutdownNow();
+ timedMergeThreadPool = null;
+ }
mergeTaskPool.shutdownNow();
logger.info("Waiting for task pool to shut down");
while (!mergeTaskPool.isShutdown()) {
// wait
}
mergeTaskPool = null;
- timedMergeThreadPool = null;
}
logger.info("MergeManager stopped");
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f805feb..0aac9a7 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -198,7 +198,7 @@ public class StorageGroupProcessor {
RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
- recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isMergeAfterReboot());
+ recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
} catch (IOException | MetadataErrorException e) {
throw new ProcessorException(e);
}