You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/12 02:05:50 UTC

[incubator-iotdb] branch dev_merge updated: add merge configs

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new d47cfa5  add merge configs
d47cfa5 is described below

commit d47cfa56bbe5aa4b414352c08759427b237b91a3
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jul 12 10:03:32 2019 +0800

    add merge configs
---
 iotdb/iotdb/conf/iotdb-engine.properties           | 27 +++++++++++++++++++++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 +++++-----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  9 ++++++++
 .../apache/iotdb/db/engine/merge/MergeManager.java | 26 +++++++++++++--------
 .../engine/storagegroup/StorageGroupProcessor.java |  2 +-
 5 files changed, 59 insertions(+), 17 deletions(-)

diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index 0466025..4001bf2 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -129,7 +129,32 @@ concurrent_flush_thread=0
 
 # whether take over the memory management by IoTDB rather than JVM when serializing memtable as bytes in memory
 # (i.e., whether use ChunkBufferPool), value true, false
-chunk_buffer_pool_enable = false
+chunk_buffer_pool_enable=false
+
+####################
+### Merge Configurations
+####################
+
+# How many thread will be set up to perform merges, 1 by default.
+# Set to 1 when less than or equal to 0.
+merge_thread_num=1
+
+# How much memory may be used in ONE merge task (in byte), 20% of maximum JVM memory by default.
+# This is only a rough estimation, starting from a relatively small value to avoid OOM.
+# Each new merge thread may take such memory, so merge_thread_num * merge_memory_budget is the
+# total memory estimation of merge.
+# merge_memory_budget=2147483648
+
+# When set to true, if some crashed merges are detected during system rebooting, such merges will
+# be continued, otherwise, the unfinished part of such merges will not be restarted while the
+# finished part still remains as it is.
+# If you are feeling the rebooting is too slow, set this to false, true by default
+continue_merge_after_reboot=true
+
+# A global merge will be performed each such interval, that is, each storage group will be merged
+# (if proper merge candidates can be found). Unit: second, default: 2hours.
+# When less than or equal to 0, timed merge is disabled.
+merge_interval_sec=7200
 
 
 ####################
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e25809c..79e1c25 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -182,9 +182,9 @@ public class IoTDBConfig {
 
   private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
 
-  private int mergeThreadNum = 2;
+  private int mergeThreadNum = 1;
 
-  private boolean mergeAfterReboot = true;
+  private boolean continueMergeAfterReboot = true;
 
   private long mergeIntervalSec = 2 * 3600L;
 
@@ -512,12 +512,12 @@ public class IoTDBConfig {
     this.mergeThreadNum = mergeThreadNum;
   }
 
-  public boolean isMergeAfterReboot() {
-    return mergeAfterReboot;
+  public boolean isContinueMergeAfterReboot() {
+    return continueMergeAfterReboot;
   }
 
-  public void setMergeAfterReboot(boolean mergeAfterReboot) {
-    this.mergeAfterReboot = mergeAfterReboot;
+  public void setContinueMergeAfterReboot(boolean continueMergeAfterReboot) {
+    this.continueMergeAfterReboot = continueMergeAfterReboot;
   }
 
   public long getMergeIntervalSec() {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57e941e..589a2f7 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -200,6 +200,15 @@ public class IoTDBDescriptor {
       conf.setZoneID(ZoneId.of(tmpTimeZone.trim()));
       logger.info("Time zone has been set to {}", conf.getZoneID());
 
+      conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
+          Long.toString(conf.getMergeMemoryBudget()))));
+      conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",
+          Integer.toString(conf.getMergeThreadNum()))));
+      conf.setContinueMergeAfterReboot(Boolean.parseBoolean(properties.getProperty(
+          "continue_merge_after_reboot", Boolean.toString(conf.isContinueMergeAfterReboot()))));
+      conf.setMergeIntervalSec(Long.parseLong(properties.getProperty("merge_interval_sec",
+          Long.toString(conf.getMergeIntervalSec()))));
+
     } catch (IOException e) {
       logger.warn("Cannot load config file because, use default configuration", e);
     } catch (Exception e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
index 0ab24b9..4e9a949 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@ -37,7 +37,7 @@ public class MergeManager implements IService {
   private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
   private static final MergeManager INSTANCE = new MergeManager();
 
-  private AtomicInteger threadNum = new AtomicInteger();
+  private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
   private ScheduledExecutorService timedMergeThreadPool;
 
@@ -56,13 +56,19 @@ public class MergeManager implements IService {
   @Override
   public void start() {
     if (mergeTaskPool == null) {
+      int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads();
+      if (threadNum <= 0) {
+        threadNum = 1;
+      }
       mergeTaskPool =
-          (ThreadPoolExecutor) Executors.newFixedThreadPool(
-              IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads(),
-              r -> new Thread(r, "mergeThread-" + threadNum.getAndIncrement()));
-      timedMergeThreadPool = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
-      timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
-          IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec(), TimeUnit.SECONDS);
+          (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
+              r -> new Thread(r, "mergeThread-" + threadCnt.getAndIncrement()));
+      long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
+      if (mergeInterval > 0) {
+        timedMergeThreadPool = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
+        timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
+            mergeInterval, TimeUnit.SECONDS);
+      }
     }
     logger.info("MergeManager started");
   }
@@ -70,14 +76,16 @@ public class MergeManager implements IService {
   @Override
   public void stop() {
     if (mergeTaskPool != null) {
-      timedMergeThreadPool.shutdownNow();
+      if (timedMergeThreadPool != null) {
+        timedMergeThreadPool.shutdownNow();
+        timedMergeThreadPool = null;
+      }
       mergeTaskPool.shutdownNow();
       logger.info("Waiting for task pool to shut down");
       while (!mergeTaskPool.isShutdown()) {
         // wait
       }
       mergeTaskPool = null;
-      timedMergeThreadPool = null;
     }
     logger.info("MergeManager stopped");
   }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f805feb..0aac9a7 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -198,7 +198,7 @@ public class StorageGroupProcessor {
       RecoverMergeTask recoverMergeTask = new RecoverMergeTask(seqTsFiles, unseqTsFiles,
           storageGroupSysDir.getPath(), this::mergeEndAction, taskName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
-      recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isMergeAfterReboot());
+      recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
     } catch (IOException | MetadataErrorException e) {
       throw new ProcessorException(e);
     }