You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/05/18 08:42:10 UTC

[iotdb] branch dynamic_compaction updated: add hitter merge last level

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dynamic_compaction by this push:
     new a703fb5  add hitter merge last level
a703fb5 is described below

commit a703fb5605f0928f5e04016eebd9070872fc752c
Author: EJTTianyu <16...@qq.com>
AuthorDate: Tue May 18 16:41:35 2021 +0800

    add hitter merge last level
---
 .../resources/conf/iotdb-engine.properties         | 13 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 26 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  8 ++
 .../level/LevelCompactionTsFileManagement.java     | 53 ++++++++----
 .../HitterLevelCompactionTsFileManagement.java     | 97 ++++++++++++++++++++--
 .../engine/compaction/utils/CompactionUtils.java   |  4 +-
 .../engine/heavyhitter/hitter/DefaultHitter.java   | 30 +++----
 .../iotdb/db/engine/merge/manage/MergeManager.java | 35 +++++++-
 8 files changed, 215 insertions(+), 51 deletions(-)

diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 52b592b..d4baeb8 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -190,7 +190,8 @@ wal_buffer_size=16777216
 tsfile_size_threshold=1
 
 # When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 256 MB.
-memtable_size_threshold=268435456
+#memtable_size_threshold=268435456
+memtable_size_threshold=8388608
 
 # When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 10000.
 avg_series_point_number_threshold=10000
@@ -368,7 +369,7 @@ force_full_merge=false
 compaction_thread_num=10
 
 # The limit of write throughput merge can reach per second
-merge_write_throughput_mb_per_sec=1
+merge_write_throughput_mb_per_sec=8
 
 ####################
 ### Hitter Merge Configurations
@@ -378,11 +379,17 @@ merge_write_throughput_mb_per_sec=1
 query_hitter_strategy=HASH_STRATEGY
 
 # max query paths hitter contains
-max_hitter_num=500
+max_hitter_num=10
 
 # size ratio of the hitter level merge
 size_ratio=2
 
+# The limit of write throughput merge can reach per second for hitter merge
+hitter_merge_write_throughput_mb_per_sec=8
+
+# continue merge after one merge
+continue_merge=false
+
 ####################
 ### Metadata Cache Configuration
 ####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8af6069..48bf515 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -328,6 +328,16 @@ public class IoTDBConfig {
   private int sizeRatio = 2;
 
   /**
+   * continue merge after one merge
+   */
+  private boolean conMerge = false;
+
+  /**
+   * The limit of write throughput merge can reach per second for hitter merge
+   */
+  private int hitterMergeWriteThroughputMbPerSec = 6;
+
+  /**
    * Works when the compaction_strategy is LEVEL_COMPACTION.
    * Whether to merge unseq files into seq files or not.
    */
@@ -1502,6 +1512,22 @@ public class IoTDBConfig {
     this.sizeRatio = sizeRatio;
   }
 
+  public int getHitterMergeWriteThroughputMbPerSec() {
+    return hitterMergeWriteThroughputMbPerSec;
+  }
+
+  public void setHitterMergeWriteThroughputMbPerSec(int hitterMergeWriteThroughputMbPerSec) {
+    this.hitterMergeWriteThroughputMbPerSec = hitterMergeWriteThroughputMbPerSec;
+  }
+
+  public void setConMerge(boolean conMerge) {
+    this.conMerge = conMerge;
+  }
+
+  public boolean isConMerge() {
+    return conMerge;
+  }
+
   public CompactionStrategy getCompactionStrategy() {
     return compactionStrategy;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 41f3d6d..4367e3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -349,6 +349,14 @@ public class IoTDBDescriptor {
           .getProperty("size_ratio",
               Integer.toString(conf.getSizeRatio()))));
 
+      conf.setConMerge(
+          Boolean.parseBoolean(
+              properties.getProperty("continue_merge", Boolean.toString(conf.isConMerge()))));
+
+      conf.setHitterMergeWriteThroughputMbPerSec(
+          Integer.parseInt(properties.getProperty("hitter_merge_write_throughput_mb_per_sec",
+              Integer.toString(conf.getHitterMergeWriteThroughputMbPerSec()))));
+
       conf.setSyncEnable(Boolean
           .parseBoolean(properties.getProperty("is_sync_enable",
               Boolean.toString(conf.isSyncEnable()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9454a14..9e78cb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.compaction.TsFileManagement;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -62,6 +63,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
   private static final Logger logger = LoggerFactory
       .getLogger(LevelCompactionTsFileManagement.class);
 
+  protected final boolean conMerge = IoTDBDescriptor.getInstance().getConfig().isConMerge();
   protected final int seqLevelNum = Math
       .max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
   private static int merge_time = 0;
@@ -487,9 +489,11 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
       merge(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum,
           unseqFileNumInEachLevel);
     }
-    this.forkCurrentFileList(timePartition);
-    if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
-      this.merge(timePartition);
+    if (conMerge) {
+      this.forkCurrentFileList(timePartition);
+      if (forkedSequenceTsFileResources.get(0).size() >= seqFileNumInEachLevel) {
+        this.merge(timePartition);
+      }
     }
   }
 
@@ -591,19 +595,36 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
 
   protected List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
     List<SortedSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
-    for (int i = 0; i < seqLevelNum; i++) {
-      newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
-          (o1, o2) -> {
-            try {
-              int rangeCompare = Long
-                  .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
-                      Long.parseLong(o2.getTsFile().getParentFile().getName()));
-              return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
-                  : rangeCompare;
-            } catch (NumberFormatException e) {
-              return compareFileName(o1.getTsFile(), o2.getTsFile());
-            }
-          })));
+    if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy() == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+      for (int i = 0; i < seqLevelNum + 1; i++) {
+        newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
+            (o1, o2) -> {
+              try {
+                int rangeCompare = Long
+                    .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+                        Long.parseLong(o2.getTsFile().getParentFile().getName()));
+                return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+                    : rangeCompare;
+              } catch (NumberFormatException e) {
+                return compareFileName(o1.getTsFile(), o2.getTsFile());
+              }
+            })));
+      }
+    } else {
+      for (int i = 0; i < seqLevelNum; i++) {
+        newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
+            (o1, o2) -> {
+              try {
+                int rangeCompare = Long
+                    .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+                        Long.parseLong(o2.getTsFile().getParentFile().getName()));
+                return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+                    : rangeCompare;
+              } catch (NumberFormatException e) {
+                return compareFileName(o1.getTsFile(), o2.getTsFile());
+              }
+            })));
+      }
     }
     return newSequenceTsFileResources;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 4653dec..6292903 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
 import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
 import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
 import org.apache.iotdb.db.engine.heavyhitter.QueryHitterManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -56,6 +57,22 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
   private final int firstLevelNum = Math
       .max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
   private final String MERGE_SUFFIX = ".temp";
+  private boolean isFullMerging = false;
+
+  public class FullMergeTask implements Runnable {
+
+    private List<TsFileResource> mergeFileLst;
+    private long timePartitionId;
+
+    public FullMergeTask(List<TsFileResource> mergeFileLst, long timePartitionId) {
+      this.mergeFileLst = mergeFileLst;
+      this.timePartitionId = timePartitionId;
+    }
+    @Override
+    public void run() {
+      mergeFull(mergeFileLst, timePartitionId);
+    }
+  }
 
   public HitterLevelCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
     super(storageGroupName, storageGroupDir);
@@ -72,9 +89,11 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
       merge(isForceFullMerge, getTsFileList(true), forkedUnSequenceTsFileResources.get(0),
           Long.MAX_VALUE);
     }
-    this.forkCurrentFileList(timePartition);
-    if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
-      merge(timePartition);
+    if (conMerge) {
+      this.forkCurrentFileList(timePartition);
+      if (forkedSequenceTsFileResources.get(0).size() >= firstLevelNum) {
+        merge(timePartition);
+      }
     }
   }
 
@@ -208,6 +227,9 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
           }
         }
       }
+      List<TsFileResource> fullMergeRes = new ArrayList<>(mergeResources.get(seqLevelNum - 2));
+      FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes, timePartition);
+      new Thread(fullMergeTask).start();
     } catch (Exception e) {
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
@@ -356,14 +378,14 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
   @Override
   public void forkCurrentFileList(long timePartition) {
     synchronized (sequenceTsFileResources) {
-      forkTsFileList(
+      forkSeqTsFileList(
           forkedSequenceTsFileResources,
           sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
           seqLevelNum);
     }
     // we have to copy all unseq file
     synchronized (unSequenceTsFileResources) {
-      forkTsFileList(
+      forkUnSeqTsFileList(
           forkedUnSequenceTsFileResources,
           unSequenceTsFileResources
               .computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
@@ -371,7 +393,40 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
     }
   }
 
-  protected void forkTsFileList(
+  protected void forkSeqTsFileList(
+      List<List<TsFileResource>> forkedTsFileResources,
+      List rawTsFileResources, int currMaxLevel) {
+    forkedTsFileResources.clear();
+    for (int i = 0; i < currMaxLevel - 1; i++) {
+      List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+      Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources
+          .get(i);
+      for (TsFileResource tsFileResource : levelRawTsFileResources) {
+        if (tsFileResource.isClosed()) {
+          forkedLevelTsFileResources.add(tsFileResource);
+          if (forkedLevelTsFileResources.size() >= firstLevelNum * Math.pow(sizeRatio, i)) {
+            break;
+          }
+        }
+      }
+      forkedTsFileResources.add(forkedLevelTsFileResources);
+    }
+    // get max level merge file
+    Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources
+        .get(currMaxLevel - 1);
+    List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+    for (TsFileResource tsFileResource: levelRawTsFileResources) {
+      if (tsFileResource.isClosed()) {
+        forkedLevelTsFileResources.add(tsFileResource);
+        if (forkedLevelTsFileResources.size() >= firstLevelNum * Math.pow(sizeRatio, currMaxLevel - 2)) {
+          break;
+        }
+      }
+    }
+    forkedTsFileResources.add(forkedLevelTsFileResources);
+  }
+
+  protected void forkUnSeqTsFileList(
       List<List<TsFileResource>> forkedTsFileResources,
       List rawTsFileResources, int currMaxLevel) {
     forkedTsFileResources.clear();
@@ -390,4 +445,34 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
       forkedTsFileResources.add(forkedLevelTsFileResources);
     }
   }
+
+  private void mergeFull(List<TsFileResource> mergeFileLst, long timePartitionId) {
+    if (isFullMerging) {
+      return;
+    }
+    try {
+      if (mergeFileLst.size() >= firstLevelNum * Math.pow(sizeRatio, seqLevelNum - 2)) {
+        isFullMerging = true;
+        CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+            storageGroupName);
+        File newLevelFile = createNewTsFileName(mergeFileLst.get(0).getTsFile(),
+            seqLevelNum);
+        TsFileResource newResource = new TsFileResource(newLevelFile);
+        CompactionUtils
+            .merge(newResource, mergeFileLst, storageGroupName, compactionLogger,
+                new HashSet<>(), true);
+        writeLock();
+        try {
+          sequenceTsFileResources.get(timePartitionId).get(seqLevelNum - 1).add(newResource);
+          deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum - 2, true);
+        } finally {
+          writeUnlock();
+        }
+        deleteLevelFilesInDisk(mergeFileLst);
+        isFullMerging = false;
+      }
+    } catch (Exception e) {
+      logger.error("Error occurred in Compaction Merge thread", e);
+    }
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 0fa0caa..d0eded9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -219,7 +219,7 @@ public class CompactionUtils {
     RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
     Map<String, List<Modification>> modificationCache = new HashMap<>();
-    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getHitterMergeWriteRateLimiter();
 
     List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedPaths);
 //    long time1 = System.nanoTime();
@@ -357,7 +357,7 @@ public class CompactionUtils {
     RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
     Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
     Map<String, List<Modification>> modificationCache = new HashMap<>();
-    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+    RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getHitterMergeWriteRateLimiter();
 
     List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedPaths);
     for (List<PartialPath> pathList : devicePaths) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
index d2802e9..0bf8b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.utils.MergeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * user defined hitter
+ */
 public class DefaultHitter implements QueryHeavyHitters {
 
   private static final Logger logger = LoggerFactory.getLogger(DefaultHitter.class);
@@ -45,36 +47,24 @@ public class DefaultHitter implements QueryHeavyHitters {
     // do nothing
   }
 
+  /**
+   *
+   * @param sgName storage group name
+   * @return top compaction series by sgName
+   */
   @Override
   public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws MetadataException {
     int totalSG = StorageEngine.getInstance().getProcessorMap().size();
     List<PartialPath> ret = new ArrayList<>();
     List<PartialPath> unmergedSeries =
         MManager.getInstance().getAllTimeseriesPath(sgName);
-//    List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedSeries);
-//    if (devicePaths.size() > 0) {
-//      String deviceName = devicePaths.get(0).get(0).getDevice();
-//      logger.info("default hitter, top compaction device:{}", deviceName);
-//      if (IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum() == -1) {
-//        List<PartialPath> devicePath = devicePaths.get(0);
-//        for (int i = 0; i < devicePath.size() / 2; i++) {
-//          ret.add(devicePath.get(i));
-//        }
-//        return ret;
-//      }
-//      return devicePaths.get(0);
-//    }
-//    for (List<PartialPath> paths: devicePaths) {
-//      for (int i = 0; i < 500; i++){
-//        ret.add(paths.get(i));
-//      }
-//    }
+
     Collections.shuffle(unmergedSeries);
     for (int i = 0; i < IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum() / totalSG;
         i++) {
       ret.add(unmergedSeries.get(i));
     }
-    logger.info("default hitter, top compaction path:{},{}", ret.get(0), ret.get(1));
+    logger.info("default hitter, compaction series example : {}", ret.get(0));
     return ret;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index ac0e38d..3be4004 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
 import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.exception.StorageEngineException;
@@ -62,6 +63,7 @@ public class MergeManager implements IService, MergeManagerMBean {
       .format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
           getID().getJmxName());
   private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
+  private final RateLimiter hitterMergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
 
   private AtomicInteger threadCnt = new AtomicInteger();
   private ThreadPoolExecutor mergeTaskPool;
@@ -76,10 +78,24 @@ public class MergeManager implements IService, MergeManagerMBean {
   }
 
   public RateLimiter getMergeWriteRateLimiter() {
-    setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
+    if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
+        == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+      setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec()
+          - IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec());
+    } else {
+      setWriteMergeRate(
+          IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
+    }
     return mergeWriteRateLimiter;
   }
 
+  public RateLimiter getHitterMergeWriteRateLimiter() {
+    setHitterWriteMergeRate(
+        IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec());
+    return hitterMergeWriteRateLimiter;
+  }
+
+
   /**
    * wait by throughoutMbPerSec limit to avoid continuous Write Or Read
    */
@@ -96,14 +112,25 @@ public class MergeManager implements IService, MergeManagerMBean {
   private void setWriteMergeRate(final double throughoutMbPerSec) {
     double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
     // if throughout = 0, disable rate limiting
-    if (throughout == 0) {
-      throughout = Double.MAX_VALUE;
-    }
+    // if (throughout == 0) {
+      // throughout = Double.MAX_VALUE;
+    // }
     if (mergeWriteRateLimiter.getRate() != throughout) {
       mergeWriteRateLimiter.setRate(throughout);
     }
   }
 
+  private void setHitterWriteMergeRate(final double throughoutMbPerSec) {
+    double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+    // if throughout = 0, disable rate limiting
+    // if (throughout == 0) {
+      // throughout = Double.MAX_VALUE;
+    // }
+    if (hitterMergeWriteRateLimiter.getRate() != throughout) {
+      hitterMergeWriteRateLimiter.setRate(throughout);
+    }
+  }
+
   public static MergeManager getINSTANCE() {
     return INSTANCE;
   }