You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/04/14 09:23:53 UTC
[incubator-iotdb] branch master updated: Make dynamic parameter
controlable (#1042)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8519fb6 Make dynamic parameter controlable (#1042)
8519fb6 is described below
commit 8519fb60a714d36d613d8981e047b8cde2fe05b5
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Tue Apr 14 17:23:42 2020 +0800
Make dynamic parameter controlable (#1042)
* make dynamic parameter adapter and ActiveTimeSeriesCounter controlable
---
.../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 6 ++
.../iotdb/db/engine/flush/MemTableFlushTask.java | 9 +-
.../db/engine/storagegroup/TsFileProcessor.java | 4 +-
.../org/apache/iotdb/db/metadata/MManager.java | 102 ++++++++++++---------
.../iotdb/db/metadata/MManagerBasicTest.java | 4 +
5 files changed, 80 insertions(+), 45 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
index 2069043..d1be324 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
@@ -127,6 +127,9 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
@Override
public synchronized boolean tryToAdaptParameters() {
+ if(!CONFIG.isEnableParameterAdapter()){
+ return true;
+ }
boolean canAdjust = true;
double ratio = CompressionRatio.getInstance().getRatio();
long memtableSizeInByte = calcMemTableSize(ratio);
@@ -218,10 +221,13 @@ public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException {
totalStorageGroup += diff;
maxMemTableNum += IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff;
+
if(!CONFIG.isEnableParameterAdapter()){
+ // the maxMemTableNum will also be set in tryToAdaptParameters, this should not move out
CONFIG.setMaxMemtableNumber(maxMemTableNum);
return;
}
+
if (!tryToAdaptParameters()) {
totalStorageGroup -= diff;
maxMemTableNum -= IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() * diff;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
index 1703844..ae35784 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/MemTableFlushTask.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.adapter.ActiveTimeSeriesCounter;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.memtable.IMemTable;
@@ -82,11 +83,15 @@ public class MemTableFlushTask {
sortTime += System.currentTimeMillis() - startTime;
encodingTaskQueue.add(new Pair<>(tvList, desc));
// register active time series to the ActiveTimeSeriesCounter
- ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ ActiveTimeSeriesCounter.getInstance().offer(storageGroup, deviceId, measurementId);
+ }
}
encodingTaskQueue.add(new EndChunkGroupIoTask());
}
- ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ ActiveTimeSeriesCounter.getInstance().updateActiveRatio(storageGroup);
+ }
noMoreEncodingTask = true;
logger.debug(
"Storage group {} memtable {}, flushing into disk: data sort time cost {} ms.",
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 7e554cf..7d7dafe 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -261,6 +261,9 @@ public class TsFileProcessor {
*/
private long getMemtableSizeThresholdBasedOnSeriesNum() {
IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ if(!config.isEnableParameterAdapter()){
+ return config.getMemtableSizeThreshold();
+ }
long memTableSize = (long) (config.getMemtableSizeThreshold() * config.getMaxMemtableNumber()
/ IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup()
* ActiveTimeSeriesCounter.getInstance()
@@ -268,7 +271,6 @@ public class TsFileProcessor {
return Math.max(memTableSize, config.getMemtableSizeThreshold());
}
-
public boolean shouldClose() {
long fileSize = tsFileResource.getFileSize();
long fileSizeThreshold = IoTDBDescriptor.getInstance().getConfig()
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 0ae8fbf..01a5e47 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -85,7 +85,7 @@ public class MManager {
// device -> DeviceMNode
private RandomDeleteCache<String, MNode> mNodeCache;
- private Map<String, Integer> seriesNumberInStorageGroups = new HashMap<>();
+ private Map<String, Integer> seriesNumberInStorageGroups;
private long maxSeriesNumberAmongStorageGroup;
private boolean initialized;
private IoTDBConfig config;
@@ -136,20 +136,22 @@ public class MManager {
try {
initFromLog(logFile);
- // storage group name -> the series number
- seriesNumberInStorageGroups = new HashMap<>();
- List<String> storageGroups = mtree.getAllStorageGroupNames();
- for (String sg : storageGroups) {
- MNode node = mtree.getNodeByPath(sg);
- seriesNumberInStorageGroups.put(sg, node.getLeafCount());
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ // storage group name -> the series number
+ seriesNumberInStorageGroups = new HashMap<>();
+ List<String> storageGroups = mtree.getAllStorageGroupNames();
+ for (String sg : storageGroups) {
+ MNode node = mtree.getNodeByPath(sg);
+ seriesNumberInStorageGroups.put(sg, node.getLeafCount());
+ }
+ if (seriesNumberInStorageGroups.isEmpty()) {
+ maxSeriesNumberAmongStorageGroup = 0;
+ } else {
+ maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
+ .max(Integer::compareTo).get();
+ }
}
- if (seriesNumberInStorageGroups.isEmpty()) {
- maxSeriesNumberAmongStorageGroup = 0;
- } else {
- maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
- .max(Integer::compareTo).get();
- }
writeToLog = true;
} catch (IOException | MetadataException e) {
mtree = new MTree();
@@ -184,7 +186,9 @@ public class MManager {
try {
this.mtree = new MTree();
this.mNodeCache.clear();
- this.seriesNumberInStorageGroups.clear();
+ if (seriesNumberInStorageGroups != null) {
+ this.seriesNumberInStorageGroups.clear();
+ }
this.maxSeriesNumberAmongStorageGroup = 0;
if (logWriter != null) {
logWriter.close();
@@ -294,10 +298,12 @@ public class MManager {
createTimeseriesWithMemoryCheckAndLog(path, dataType, encoding, compressor, props);
// update statistics
- int size = seriesNumberInStorageGroups.get(storageGroupName);
- seriesNumberInStorageGroups.put(storageGroupName, size + 1);
- if (size + 1 > maxSeriesNumberAmongStorageGroup) {
- maxSeriesNumberAmongStorageGroup = size + 1;
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ int size = seriesNumberInStorageGroups.get(storageGroupName);
+ seriesNumberInStorageGroups.put(storageGroupName, size + 1);
+ if (size + 1 > maxSeriesNumberAmongStorageGroup) {
+ maxSeriesNumberAmongStorageGroup = size + 1;
+ }
}
} finally {
lock.writeLock().unlock();
@@ -361,12 +367,16 @@ public class MManager {
public Set<String> deleteTimeseries(String prefixPath) throws MetadataException {
lock.writeLock().lock();
if (isStorageGroup(prefixPath)) {
- int size = seriesNumberInStorageGroups.get(prefixPath);
- seriesNumberInStorageGroups.put(prefixPath, 0);
- if (size == maxSeriesNumberAmongStorageGroup) {
- seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
- .ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ int size = seriesNumberInStorageGroups.get(prefixPath);
+ seriesNumberInStorageGroups.put(prefixPath, 0);
+ if (size == maxSeriesNumberAmongStorageGroup) {
+ seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
+ .ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
+ }
}
+
mNodeCache.clear();
}
try {
@@ -412,12 +422,15 @@ public class MManager {
} catch (ConfigAdjusterException e) {
throw new MetadataException(e);
}
- String storageGroup = getStorageGroupName(path);
- int size = seriesNumberInStorageGroups.get(storageGroup);
- seriesNumberInStorageGroups.put(storageGroup, size - 1);
- if (size == maxSeriesNumberAmongStorageGroup) {
- seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
- .ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ String storageGroup = getStorageGroupName(path);
+ int size = seriesNumberInStorageGroups.get(storageGroup);
+ seriesNumberInStorageGroups.put(storageGroup, size - 1);
+ if (size == maxSeriesNumberAmongStorageGroup) {
+ seriesNumberInStorageGroups.values().stream().max(Integer::compareTo)
+ .ifPresent(val -> maxSeriesNumberAmongStorageGroup = val);
+ }
}
return storageGroupName;
} finally {
@@ -441,8 +454,10 @@ public class MManager {
writer.flush();
}
IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
- ActiveTimeSeriesCounter.getInstance().init(storageGroup);
- seriesNumberInStorageGroups.put(storageGroup, 0);
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ ActiveTimeSeriesCounter.getInstance().init(storageGroup);
+ seriesNumberInStorageGroups.put(storageGroup, 0);
+ }
} catch (IOException e) {
throw new MetadataException(e.getMessage());
} catch (ConfigAdjusterException e) {
@@ -473,17 +488,20 @@ public class MManager {
writer.flush();
}
mNodeCache.clear();
- IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
- int size = seriesNumberInStorageGroups.get(storageGroup);
- IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(size * -1);
- ActiveTimeSeriesCounter.getInstance().delete(storageGroup);
- seriesNumberInStorageGroups.remove(storageGroup);
- if (size == maxSeriesNumberAmongStorageGroup) {
- if (seriesNumberInStorageGroups.isEmpty()) {
- maxSeriesNumberAmongStorageGroup = 0;
- } else {
- maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
- .max(Integer::compareTo).get();
+
+ if (IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter()) {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
+ int size = seriesNumberInStorageGroups.get(storageGroup);
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(size * -1);
+ ActiveTimeSeriesCounter.getInstance().delete(storageGroup);
+ seriesNumberInStorageGroups.remove(storageGroup);
+ if (size == maxSeriesNumberAmongStorageGroup) {
+ if (seriesNumberInStorageGroups.isEmpty()) {
+ maxSeriesNumberAmongStorageGroup = 0;
+ } else {
+ maxSeriesNumberAmongStorageGroup = seriesNumberInStorageGroups.values().stream()
+ .max(Integer::compareTo).get();
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 7ac21c6..cbd14d4 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -44,15 +44,19 @@ import org.junit.Test;
public class MManagerBasicTest {
private CompressionType compressionType;
+ private boolean canAdjust = IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter();
@Before
public void setUp() throws Exception {
+ canAdjust = IoTDBDescriptor.getInstance().getConfig().isEnableParameterAdapter();
compressionType = TSFileDescriptor.getInstance().getConfig().getCompressor();
EnvironmentUtils.envSetUp();
+ IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(true);
}
@After
public void tearDown() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setEnableParameterAdapter(canAdjust);
EnvironmentUtils.cleanEnv();
}