You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/05/18 08:42:10 UTC
[iotdb] branch dynamic_compaction updated: add hitter merge last
level
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dynamic_compaction
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dynamic_compaction by this push:
new a703fb5 add hitter merge last level
a703fb5 is described below
commit a703fb5605f0928f5e04016eebd9070872fc752c
Author: EJTTianyu <16...@qq.com>
AuthorDate: Tue May 18 16:41:35 2021 +0800
add hitter merge last level
---
.../resources/conf/iotdb-engine.properties | 13 ++-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 26 ++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 8 ++
.../level/LevelCompactionTsFileManagement.java | 53 ++++++++----
.../HitterLevelCompactionTsFileManagement.java | 97 ++++++++++++++++++++--
.../engine/compaction/utils/CompactionUtils.java | 4 +-
.../engine/heavyhitter/hitter/DefaultHitter.java | 30 +++----
.../iotdb/db/engine/merge/manage/MergeManager.java | 35 +++++++-
8 files changed, 215 insertions(+), 51 deletions(-)
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 52b592b..d4baeb8 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -190,7 +190,8 @@ wal_buffer_size=16777216
tsfile_size_threshold=1
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 256 MB.
-memtable_size_threshold=268435456
+#memtable_size_threshold=268435456
+memtable_size_threshold=8388608
# When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 10000.
avg_series_point_number_threshold=10000
@@ -368,7 +369,7 @@ force_full_merge=false
compaction_thread_num=10
# The limit of write throughput merge can reach per second
-merge_write_throughput_mb_per_sec=1
+merge_write_throughput_mb_per_sec=8
####################
### Hitter Merge Configurations
@@ -378,11 +379,17 @@ merge_write_throughput_mb_per_sec=1
query_hitter_strategy=HASH_STRATEGY
# max query paths hitter contains
-max_hitter_num=500
+max_hitter_num=10
# size ratio of the hitter level merge
size_ratio=2
+# The limit of write throughput merge can reach per second for hitter merge
+hitter_merge_write_throughput_mb_per_sec=8
+
+# continue merge after one merge
+continue_merge=false
+
####################
### Metadata Cache Configuration
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 8af6069..48bf515 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -328,6 +328,16 @@ public class IoTDBConfig {
private int sizeRatio = 2;
/**
+ * continue merge after one merge
+ */
+ private boolean conMerge = false;
+
+ /**
+ * The limit of write throughput merge can reach per second for hitter merge
+ */
+ private int hitterMergeWriteThroughputMbPerSec = 6;
+
+ /**
* Works when the compaction_strategy is LEVEL_COMPACTION.
* Whether to merge unseq files into seq files or not.
*/
@@ -1502,6 +1512,22 @@ public class IoTDBConfig {
this.sizeRatio = sizeRatio;
}
+ public int getHitterMergeWriteThroughputMbPerSec() {
+ return hitterMergeWriteThroughputMbPerSec;
+ }
+
+ public void setHitterMergeWriteThroughputMbPerSec(int hitterMergeWriteThroughputMbPerSec) {
+ this.hitterMergeWriteThroughputMbPerSec = hitterMergeWriteThroughputMbPerSec;
+ }
+
+ public void setConMerge(boolean conMerge) {
+ this.conMerge = conMerge;
+ }
+
+ public boolean isConMerge() {
+ return conMerge;
+ }
+
public CompactionStrategy getCompactionStrategy() {
return compactionStrategy;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 41f3d6d..4367e3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -349,6 +349,14 @@ public class IoTDBDescriptor {
.getProperty("size_ratio",
Integer.toString(conf.getSizeRatio()))));
+ conf.setConMerge(
+ Boolean.parseBoolean(
+ properties.getProperty("continue_merge", Boolean.toString(conf.isConMerge()))));
+
+ conf.setHitterMergeWriteThroughputMbPerSec(
+ Integer.parseInt(properties.getProperty("hitter_merge_write_throughput_mb_per_sec",
+ Integer.toString(conf.getHitterMergeWriteThroughputMbPerSec()))));
+
conf.setSyncEnable(Boolean
.parseBoolean(properties.getProperty("is_sync_enable",
Boolean.toString(conf.isSyncEnable()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index 9454a14..9e78cb4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -42,6 +42,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.compaction.TsFileManagement;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogAnalyzer;
import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
@@ -62,6 +63,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
private static final Logger logger = LoggerFactory
.getLogger(LevelCompactionTsFileManagement.class);
+ protected final boolean conMerge = IoTDBDescriptor.getInstance().getConfig().isConMerge();
protected final int seqLevelNum = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqLevelNum(), 1);
private static int merge_time = 0;
@@ -487,9 +489,11 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
merge(forkedUnSequenceTsFileResources, false, timePartition, unseqLevelNum,
unseqFileNumInEachLevel);
}
- this.forkCurrentFileList(timePartition);
- if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
- this.merge(timePartition);
+ if (conMerge) {
+ this.forkCurrentFileList(timePartition);
+ if (forkedSequenceTsFileResources.get(0).size() >= seqFileNumInEachLevel) {
+ this.merge(timePartition);
+ }
}
}
@@ -591,19 +595,36 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
protected List<SortedSet<TsFileResource>> newSequenceTsFileResources(Long k) {
List<SortedSet<TsFileResource>> newSequenceTsFileResources = new CopyOnWriteArrayList<>();
- for (int i = 0; i < seqLevelNum; i++) {
- newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
- (o1, o2) -> {
- try {
- int rangeCompare = Long
- .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
- Long.parseLong(o2.getTsFile().getParentFile().getName()));
- return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
- : rangeCompare;
- } catch (NumberFormatException e) {
- return compareFileName(o1.getTsFile(), o2.getTsFile());
- }
- })));
+ if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy() == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+ for (int i = 0; i < seqLevelNum + 1; i++) {
+ newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
+ (o1, o2) -> {
+ try {
+ int rangeCompare = Long
+ .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+ Long.parseLong(o2.getTsFile().getParentFile().getName()));
+ return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+ : rangeCompare;
+ } catch (NumberFormatException e) {
+ return compareFileName(o1.getTsFile(), o2.getTsFile());
+ }
+ })));
+ }
+ } else {
+ for (int i = 0; i < seqLevelNum; i++) {
+ newSequenceTsFileResources.add(Collections.synchronizedSortedSet(new TreeSet<>(
+ (o1, o2) -> {
+ try {
+ int rangeCompare = Long
+ .compare(Long.parseLong(o1.getTsFile().getParentFile().getName()),
+ Long.parseLong(o2.getTsFile().getParentFile().getName()));
+ return rangeCompare == 0 ? compareFileName(o1.getTsFile(), o2.getTsFile())
+ : rangeCompare;
+ } catch (NumberFormatException e) {
+ return compareFileName(o1.getTsFile(), o2.getTsFile());
+ }
+ })));
+ }
}
return newSequenceTsFileResources;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
index 4653dec..6292903 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/hitter/HitterLevelCompactionTsFileManagement.java
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.ChunkMetadataCache;
import org.apache.iotdb.db.engine.compaction.level.LevelCompactionTsFileManagement;
+import org.apache.iotdb.db.engine.compaction.utils.CompactionLogger;
import org.apache.iotdb.db.engine.compaction.utils.CompactionUtils;
import org.apache.iotdb.db.engine.heavyhitter.QueryHitterManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -56,6 +57,22 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
private final int firstLevelNum = Math
.max(IoTDBDescriptor.getInstance().getConfig().getSeqFileNumInEachLevel(), 1);
private final String MERGE_SUFFIX = ".temp";
+ private boolean isFullMerging = false;
+
+ public class FullMergeTask implements Runnable {
+
+ private List<TsFileResource> mergeFileLst;
+ private long timePartitionId;
+
+ public FullMergeTask(List<TsFileResource> mergeFileLst, long timePartitionId) {
+ this.mergeFileLst = mergeFileLst;
+ this.timePartitionId = timePartitionId;
+ }
+ @Override
+ public void run() {
+ mergeFull(mergeFileLst, timePartitionId);
+ }
+ }
public HitterLevelCompactionTsFileManagement(String storageGroupName, String storageGroupDir) {
super(storageGroupName, storageGroupDir);
@@ -72,9 +89,11 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
merge(isForceFullMerge, getTsFileList(true), forkedUnSequenceTsFileResources.get(0),
Long.MAX_VALUE);
}
- this.forkCurrentFileList(timePartition);
- if (!forkedSequenceTsFileResources.get(0).isEmpty()) {
- merge(timePartition);
+ if (conMerge) {
+ this.forkCurrentFileList(timePartition);
+ if (forkedSequenceTsFileResources.get(0).size() >= firstLevelNum) {
+ merge(timePartition);
+ }
}
}
@@ -208,6 +227,9 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
}
}
}
+ List<TsFileResource> fullMergeRes = new ArrayList<>(mergeResources.get(seqLevelNum - 2));
+ FullMergeTask fullMergeTask = new FullMergeTask(fullMergeRes, timePartition);
+ new Thread(fullMergeTask).start();
} catch (Exception e) {
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
@@ -356,14 +378,14 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
@Override
public void forkCurrentFileList(long timePartition) {
synchronized (sequenceTsFileResources) {
- forkTsFileList(
+ forkSeqTsFileList(
forkedSequenceTsFileResources,
sequenceTsFileResources.computeIfAbsent(timePartition, this::newSequenceTsFileResources),
seqLevelNum);
}
// we have to copy all unseq file
synchronized (unSequenceTsFileResources) {
- forkTsFileList(
+ forkUnSeqTsFileList(
forkedUnSequenceTsFileResources,
unSequenceTsFileResources
.computeIfAbsent(timePartition, this::newUnSequenceTsFileResources),
@@ -371,7 +393,40 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
}
}
- protected void forkTsFileList(
+ protected void forkSeqTsFileList(
+ List<List<TsFileResource>> forkedTsFileResources,
+ List rawTsFileResources, int currMaxLevel) {
+ forkedTsFileResources.clear();
+ for (int i = 0; i < currMaxLevel - 1; i++) {
+ List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+ Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources
+ .get(i);
+ for (TsFileResource tsFileResource : levelRawTsFileResources) {
+ if (tsFileResource.isClosed()) {
+ forkedLevelTsFileResources.add(tsFileResource);
+ if (forkedLevelTsFileResources.size() >= firstLevelNum * Math.pow(sizeRatio, i)) {
+ break;
+ }
+ }
+ }
+ forkedTsFileResources.add(forkedLevelTsFileResources);
+ }
+ // get max level merge file
+ Collection<TsFileResource> levelRawTsFileResources = (Collection<TsFileResource>) rawTsFileResources
+ .get(currMaxLevel - 1);
+ List<TsFileResource> forkedLevelTsFileResources = new ArrayList<>();
+ for (TsFileResource tsFileResource: levelRawTsFileResources) {
+ if (tsFileResource.isClosed()) {
+ forkedLevelTsFileResources.add(tsFileResource);
+ if (forkedLevelTsFileResources.size() >= firstLevelNum * Math.pow(sizeRatio, currMaxLevel - 2)) {
+ break;
+ }
+ }
+ }
+ forkedTsFileResources.add(forkedLevelTsFileResources);
+ }
+
+ protected void forkUnSeqTsFileList(
List<List<TsFileResource>> forkedTsFileResources,
List rawTsFileResources, int currMaxLevel) {
forkedTsFileResources.clear();
@@ -390,4 +445,34 @@ public class HitterLevelCompactionTsFileManagement extends LevelCompactionTsFile
forkedTsFileResources.add(forkedLevelTsFileResources);
}
}
+
+ private void mergeFull(List<TsFileResource> mergeFileLst, long timePartitionId) {
+ if (isFullMerging) {
+ return;
+ }
+ try {
+ if (mergeFileLst.size() >= firstLevelNum * Math.pow(sizeRatio, seqLevelNum - 2)) {
+ isFullMerging = true;
+ CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
+ storageGroupName);
+ File newLevelFile = createNewTsFileName(mergeFileLst.get(0).getTsFile(),
+ seqLevelNum);
+ TsFileResource newResource = new TsFileResource(newLevelFile);
+ CompactionUtils
+ .merge(newResource, mergeFileLst, storageGroupName, compactionLogger,
+ new HashSet<>(), true);
+ writeLock();
+ try {
+ sequenceTsFileResources.get(timePartitionId).get(seqLevelNum - 1).add(newResource);
+ deleteLevelFilesInList(timePartitionId, mergeFileLst, seqLevelNum - 2, true);
+ } finally {
+ writeUnlock();
+ }
+ deleteLevelFilesInDisk(mergeFileLst);
+ isFullMerging = false;
+ }
+ } catch (Exception e) {
+ logger.error("Error occurred in Compaction Merge thread", e);
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
index 0fa0caa..d0eded9 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/utils/CompactionUtils.java
@@ -219,7 +219,7 @@ public class CompactionUtils {
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, List<Modification>> modificationCache = new HashMap<>();
- RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getHitterMergeWriteRateLimiter();
List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedPaths);
// long time1 = System.nanoTime();
@@ -357,7 +357,7 @@ public class CompactionUtils {
RestorableTsFileIOWriter writer = new RestorableTsFileIOWriter(targetResource.getTsFile());
Map<String, TsFileSequenceReader> tsFileSequenceReaderMap = new HashMap<>();
Map<String, List<Modification>> modificationCache = new HashMap<>();
- RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getMergeWriteRateLimiter();
+ RateLimiter compactionWriteRateLimiter = MergeManager.getINSTANCE().getHitterMergeWriteRateLimiter();
List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedPaths);
for (List<PartialPath> pathList : devicePaths) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
index d2802e9..0bf8b18 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/heavyhitter/hitter/DefaultHitter.java
@@ -28,10 +28,12 @@ import org.apache.iotdb.db.engine.heavyhitter.QueryHeavyHitters;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
-import org.apache.iotdb.db.utils.MergeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * user defined hitter
+ */
public class DefaultHitter implements QueryHeavyHitters {
private static final Logger logger = LoggerFactory.getLogger(DefaultHitter.class);
@@ -45,36 +47,24 @@ public class DefaultHitter implements QueryHeavyHitters {
// do nothing
}
+ /**
+ *
+ * @param sgName storage group name
+ * @return top compaction series by sgName
+ */
@Override
public List<PartialPath> getTopCompactionSeries(PartialPath sgName) throws MetadataException {
int totalSG = StorageEngine.getInstance().getProcessorMap().size();
List<PartialPath> ret = new ArrayList<>();
List<PartialPath> unmergedSeries =
MManager.getInstance().getAllTimeseriesPath(sgName);
-// List<List<PartialPath>> devicePaths = MergeUtils.splitPathsByDevice(unmergedSeries);
-// if (devicePaths.size() > 0) {
-// String deviceName = devicePaths.get(0).get(0).getDevice();
-// logger.info("default hitter, top compaction device:{}", deviceName);
-// if (IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum() == -1) {
-// List<PartialPath> devicePath = devicePaths.get(0);
-// for (int i = 0; i < devicePath.size() / 2; i++) {
-// ret.add(devicePath.get(i));
-// }
-// return ret;
-// }
-// return devicePaths.get(0);
-// }
-// for (List<PartialPath> paths: devicePaths) {
-// for (int i = 0; i < 500; i++){
-// ret.add(paths.get(i));
-// }
-// }
+
Collections.shuffle(unmergedSeries);
for (int i = 0; i < IoTDBDescriptor.getInstance().getConfig().getMaxHitterNum() / totalSG;
i++) {
ret.add(unmergedSeries.get(i));
}
- logger.info("default hitter, top compaction path:{},{}", ret.get(0), ret.get(1));
+ logger.info("default hitter, compaction series example : {}", ret.get(0));
return ret;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
index ac0e38d..3be4004 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeManager.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.engine.merge.task.MergeMultiChunkTask.MergeChunkHeapTask;
import org.apache.iotdb.db.engine.merge.task.MergeTask;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -62,6 +63,7 @@ public class MergeManager implements IService, MergeManagerMBean {
.format("%s:%s=%s", IoTDBConstant.IOTDB_PACKAGE, IoTDBConstant.JMX_TYPE,
getID().getJmxName());
private final RateLimiter mergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
+ private final RateLimiter hitterMergeWriteRateLimiter = RateLimiter.create(Double.MAX_VALUE);
private AtomicInteger threadCnt = new AtomicInteger();
private ThreadPoolExecutor mergeTaskPool;
@@ -76,10 +78,24 @@ public class MergeManager implements IService, MergeManagerMBean {
}
public RateLimiter getMergeWriteRateLimiter() {
- setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
+ if (IoTDBDescriptor.getInstance().getConfig().getCompactionStrategy()
+ == CompactionStrategy.HITTER_LEVEL_COMPACTION) {
+ setWriteMergeRate(IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec()
+ - IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec());
+ } else {
+ setWriteMergeRate(
+ IoTDBDescriptor.getInstance().getConfig().getMergeWriteThroughputMbPerSec());
+ }
return mergeWriteRateLimiter;
}
+ public RateLimiter getHitterMergeWriteRateLimiter() {
+ setHitterWriteMergeRate(
+ IoTDBDescriptor.getInstance().getConfig().getHitterMergeWriteThroughputMbPerSec());
+ return hitterMergeWriteRateLimiter;
+ }
+
+
/**
* wait by throughoutMbPerSec limit to avoid continuous Write Or Read
*/
@@ -96,14 +112,25 @@ public class MergeManager implements IService, MergeManagerMBean {
private void setWriteMergeRate(final double throughoutMbPerSec) {
double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
// if throughout = 0, disable rate limiting
- if (throughout == 0) {
- throughout = Double.MAX_VALUE;
- }
+ // if (throughout == 0) {
+ // throughout = Double.MAX_VALUE;
+ // }
if (mergeWriteRateLimiter.getRate() != throughout) {
mergeWriteRateLimiter.setRate(throughout);
}
}
+ private void setHitterWriteMergeRate(final double throughoutMbPerSec) {
+ double throughout = throughoutMbPerSec * 1024.0 * 1024.0;
+ // if throughout = 0, disable rate limiting
+ // if (throughout == 0) {
+ // throughout = Double.MAX_VALUE;
+ // }
+ if (hitterMergeWriteRateLimiter.getRate() != throughout) {
+ hitterMergeWriteRateLimiter.setRate(throughout);
+ }
+ }
+
public static MergeManager getINSTANCE() {
return INSTANCE;
}