You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/07/09 13:05:33 UTC
[incubator-iotdb] 02/05: add iotdb config dynamic adapter
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch dynamic_parameters
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit a0df4019e13864227fab7c4e76a1741395aa6101
Author: lta <li...@163.com>
AuthorDate: Tue Jul 9 15:37:13 2019 +0800
add iotdb config dynamic adapter
---
iotdb/iotdb/conf/iotdb-engine.properties | 5 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 -
.../iotdb/db/conf/adapter/CompressionRatio.java | 47 +++++++-
.../iotdb/db/conf/adapter/IDynamicAdapter.java | 6 +-
.../db/conf/adapter/IoTDBConfigDynamicAdapter.java | 120 +++++++++++---------
.../org/apache/iotdb/db/engine/StorageEngine.java | 7 ++
.../iotdb/db/engine/memtable/ChunkBufferPool.java | 4 +-
.../engine/storagegroup/StorageGroupProcessor.java | 54 ++++++---
.../db/engine/storagegroup/TsFileProcessor.java | 22 ++--
.../db/exception/ConfigAdjusterException.java | 11 +-
.../org/apache/iotdb/db/metadata/MManager.java | 70 ++++++++----
.../iotdb/db/qp/executor/QueryProcessExecutor.java | 2 +
.../org/apache/iotdb/db/rescon/MemTablePool.java | 19 ++--
.../adapter/IoTDBConfigDynamicAdapterTest.java | 124 +++++++++------------
.../write/writer/RestorableTsFileIOWriter.java | 1 -
16 files changed, 300 insertions(+), 212 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index 0466025..29595b2 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -100,6 +100,8 @@ force_wal_period_in_ms=10
### Memory Control Configuration
####################
+memory_allocation_ratio=6:2:2
+
# The maximum concurrent thread number for merging
# Increase this value, it will increase IO and CPU consumption
# Decrease this value, when there is much unsequence data, it will increase disk usage, which will reduce read speed
@@ -113,9 +115,6 @@ fetch_size=10000
# If WAL is enabled and the size of a insert plan is smaller than this parameter, then the insert plan will be rejected by WAL
wal_buffer_size=16777216
-# total number of memtables in memtable pool, should be set at least as twice of the number of storage groups.
-memtable_number=20
-
# time zone of server side
# default value is +08:00
# eg. +08:00, -01:00
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 82dab68..ac67ef5 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -91,8 +91,10 @@ public class IoTDBConfig {
*/
private String indexFileDir = "data/index";
-
- private int memtableNumber = 20;
+ /**
+ * Maximum MemTable number in MemTable pool.
+ */
+ private int maxMemtableNumber = 20;
/**
* The maximum concurrent thread number for merging. When the value <=0 or > CPU core number, use
@@ -301,12 +303,12 @@ public class IoTDBConfig {
this.fetchSize = fetchSize;
}
- public int getMemtableNumber() {
- return memtableNumber;
+ public int getMaxMemtableNumber() {
+ return maxMemtableNumber;
}
- void setMemtableNumber(int memtableNumber) {
- this.memtableNumber = memtableNumber;
+ public void setMaxMemtableNumber(int maxMemtableNumber) {
+ this.maxMemtableNumber = maxMemtableNumber;
}
public int getConcurrentFlushThread() {
@@ -325,7 +327,7 @@ public class IoTDBConfig {
return tsFileSizeThreshold;
}
- void setTsFileSizeThreshold(long tsFileSizeThreshold) {
+ public void setTsFileSizeThreshold(long tsFileSizeThreshold) {
this.tsFileSizeThreshold = tsFileSizeThreshold;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 57e941e..3f964ef 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -135,10 +135,6 @@ public class IoTDBDescriptor {
conf.setWalFolder(properties.getProperty("wal_dir", conf.getWalFolder()));
- conf.setMemtableNumber(Integer
- .parseInt(properties.getProperty("memtable_number",
- Integer.toString(conf.getMemtableNumber()))));
-
conf.setFlushWalThreshold(Integer
.parseInt(properties.getProperty("flush_wal_threshold",
Integer.toString(conf.getFlushWalThreshold()))));
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/CompressionRatio.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/CompressionRatio.java
index 83ced89..ff57f21 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/CompressionRatio.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/CompressionRatio.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
package org.apache.iotdb.db.conf.adapter;
import java.io.File;
@@ -30,7 +48,7 @@ public class CompressionRatio {
private String directoryPath;
- public CompressionRatio() {
+ private CompressionRatio() {
directoryPath = FilePathUtils.regularizePath(CONFIG.getSystemDir()) + COMPRESSION_RATIO_NAME;
try {
restore();
@@ -40,12 +58,15 @@ public class CompressionRatio {
}
public synchronized void updateRatio(int flushNum) throws IOException {
- File oldFile = new File(directoryPath, FILE_PREFIX + compressionRatioSum + SEPARATOR + calcuTimes);
+ File oldFile = new File(directoryPath,
+ FILE_PREFIX + compressionRatioSum + SEPARATOR + calcuTimes);
compressionRatioSum +=
(TSFileConfig.groupSizeInByte * flushNum) / CONFIG.getTsFileSizeThreshold();
calcuTimes++;
- File newFile = new File(directoryPath, FILE_PREFIX + compressionRatioSum + SEPARATOR + calcuTimes);
+ File newFile = new File(directoryPath,
+ FILE_PREFIX + compressionRatioSum + SEPARATOR + calcuTimes);
persist(oldFile, newFile);
+ IoTDBConfigDynamicAdapter.getInstance().tryToAdaptParameters();
}
public synchronized double getRatio() {
@@ -72,9 +93,9 @@ public class CompressionRatio {
long maxTimes = 0;
int maxRatioIndex = 0;
for (int i = 0; i < ratioFiles.length; i++) {
- long calcuTimes = Long.parseLong(ratioFiles[i].getName().split("-")[2]);
- if (calcuTimes > maxTimes) {
- maxTimes = calcuTimes;
+ long times = Long.parseLong(ratioFiles[i].getName().split("-")[2]);
+ if (times > maxTimes) {
+ maxTimes = times;
maxRatioIndex = i;
}
}
@@ -89,4 +110,18 @@ public class CompressionRatio {
FileUtils.forceMkdir(restoreFile);
}
}
+
+ public static CompressionRatio getInstance() {
+ return CompressionRatioHolder.INSTANCE;
+ }
+
+ private static class CompressionRatioHolder {
+
+ private static final CompressionRatio INSTANCE = new CompressionRatio();
+
+ private CompressionRatioHolder() {
+
+ }
+
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IDynamicAdapter.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IDynamicAdapter.java
index 4cfc64e..c49e209 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IDynamicAdapter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IDynamicAdapter.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.conf.adjuster;
+package org.apache.iotdb.db.conf.adapter;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-public interface IDynamicAdjuster {
+public interface IDynamicAdapter {
/**
* Init all parameters from config.
@@ -30,7 +30,7 @@ public interface IDynamicAdjuster {
/**
* Adjust parameters of memTableNumber, memTableSize and maximum tsfile size.
*/
- boolean tryToAdjustParameters();
+ boolean tryToAdaptParameters();
/**
* Add or delete storage groups
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
index 59ad476..dca6b33 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapter.java
@@ -16,37 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.conf.adjuster;
+package org.apache.iotdb.db.conf.adapter;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.exception.ConfigAdjusterException;
-import org.apache.iotdb.db.exception.MetadataErrorException;
-import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayPool;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-public class IoTDBConfigDynamicAdjuster implements IDynamicAdjuster {
+public class IoTDBConfigDynamicAdapter implements IDynamicAdapter {
- private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBConfigDynamicAdjuster.class);
private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
// static parameter section
- private static final float MAX_MEMORY_RATIO = 0.8f;
-
- private static final float COMPRESSION_RATIO = 0.8f;
+ private static final float WRITE_MEMORY_RATIO = 0.8f;
/**
* Maximum amount of memory that the Java virtual machine will attempt to use
*/
- private static final long MAX_MEMORY_B = (long) (Runtime.getRuntime().maxMemory() * MAX_MEMORY_RATIO);
+ private static final long MAX_MEMORY_B = (long) (Runtime.getRuntime().maxMemory()
+ * WRITE_MEMORY_RATIO);
/**
* Metadata size of per timeseries, the default value is 2KB.
@@ -61,56 +54,45 @@ public class IoTDBConfigDynamicAdjuster implements IDynamicAdjuster {
/**
* Average queue length in memtable pool
*/
- private static final int MEM_TABLE_AVERAGE_QUEUE_LEN = 5;
+ public static final int MEM_TABLE_AVERAGE_QUEUE_LEN = 5;
// static memory section
- private int totalTimeseriesNum;
-
/**
* Static memory, includes all timeseries metadata, which equals to TIMESERIES_METADATA_SIZE_B *
* totalTimeseriesNum, the unit is byte
*/
private long staticMemory;
- // MemTable section
+ private int totalTimeseries;
- private int totalStorageGroupNum;
+ // MemTable section
- private int maxMemTableNum;
+ private int maxMemTableNum = MEM_TABLE_AVERAGE_QUEUE_LEN;
private int currentMemTableSize;
+ private boolean initialized = false;
+
@Override
public void init() {
- try {
- totalStorageGroupNum = MManager.getInstance().getAllStorageGroup().size();
- totalTimeseriesNum = MManager.getInstance().getPaths(IoTDBConstant.PATH_ROOT).size();
- } catch (PathErrorException e) {
- LOGGER.error("Getting total storage group num meets error, use default value 0.", e);
- } catch (MetadataErrorException e) {
- LOGGER.error("Getting total timeseries num meets error, use default value 0.", e);
- }
- maxMemTableNum = (totalStorageGroupNum << 1) + MEM_TABLE_AVERAGE_QUEUE_LEN;
- staticMemory = totalTimeseriesNum * TIMESERIES_METADATA_SIZE_B;
- tryToAdjustParameters();
}
@Override
- public boolean tryToAdjustParameters() {
+ public synchronized boolean tryToAdaptParameters() {
boolean shouldAdjust = true;
int memtableSizeInByte = calcuMemTableSize();
int memTableSizeFloorThreshold = getMemTableSizeFloorThreshold();
boolean shouldClose = false;
long tsFileSize = CONFIG.getTsFileSizeThreshold();
if (memtableSizeInByte < memTableSizeFloorThreshold) {
- memtableSizeInByte = memTableSizeFloorThreshold;
shouldClose = true;
tsFileSize = calcuTsFileSize(memTableSizeFloorThreshold);
+ memtableSizeInByte = (int) tsFileSize;
if (tsFileSize < memTableSizeFloorThreshold) {
shouldAdjust = false;
}
- } else if(memtableSizeInByte > tsFileSize){
+ } else if (memtableSizeInByte > tsFileSize) {
memtableSizeInByte = (int) tsFileSize;
}
@@ -125,28 +107,44 @@ public class IoTDBConfigDynamicAdjuster implements IDynamicAdjuster {
}
currentMemTableSize = memtableSizeInByte;
}
+ if (!initialized) {
+ CONFIG.setMaxMemtableNumber(maxMemTableNum);
+ return true;
+ }
return shouldAdjust;
}
+ /**
+ * Calculate appropriate MemTable size
+ *
+ * @return MemTable size. If the value is -1, there is no valid solution.
+ */
private int calcuMemTableSize() {
- long a = (long) (COMPRESSION_RATIO * maxMemTableNum);
- long b = (long) ((staticMemory - MAX_MEMORY_B) * COMPRESSION_RATIO);
+ double ratio = CompressionRatio.getInstance().getRatio();
+ long a = (long) (ratio * maxMemTableNum);
+ long b = (long) ((staticMemory - MAX_MEMORY_B) * ratio);
long c = CONFIG.getTsFileSizeThreshold() * maxMemTableNum * CHUNK_METADATA_SIZE_B * MManager
.getInstance().getMaximalSeriesNumberAmongStorageGroups();
- double memTableSize1 = ((-b + Math.sqrt(b * b - 4 * a * c)) / (2 * a));
- System.out.println(memTableSize1/IoTDBConstant.MB);
- System.out.println(((-b - Math.sqrt(b * b - 4 * a * c)) / (2 * a))/ IoTDBConstant.MB);
- return (int) memTableSize1;
+ long tempValue = b * b - 4 * a * c;
+ double memTableSize = ((-b + Math.sqrt(tempValue)) / (2 * a));
+ return tempValue < 0 ? -1 : (int) memTableSize;
}
+ /**
+ * Calculate appropriate Tsfile size based on MemTable size
+ *
+ * @param memTableSize MemTable size
+ * @return Tsfile threshold
+ */
private int calcuTsFileSize(int memTableSize) {
- return (int) ((MAX_MEMORY_B - maxMemTableNum * memTableSize - staticMemory) * COMPRESSION_RATIO
+ return (int) ((MAX_MEMORY_B - maxMemTableNum * memTableSize - staticMemory) * CompressionRatio
+ .getInstance().getRatio()
* memTableSize / (maxMemTableNum * CHUNK_METADATA_SIZE_B * MManager.getInstance()
.getMaximalSeriesNumberAmongStorageGroups()));
}
/**
- * Get the floor threshold memtable size
+ * Get the floor threshold MemTable size
*/
private int getMemTableSizeFloorThreshold() {
return MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
@@ -158,11 +156,9 @@ public class IoTDBConfigDynamicAdjuster implements IDynamicAdjuster {
*/
@Override
public void addOrDeleteStorageGroup(int diff) throws ConfigAdjusterException {
- totalStorageGroupNum += diff;
maxMemTableNum += 2 * diff;
- if (!tryToAdjustParameters()) {
- totalStorageGroupNum -= diff;
- maxMemTableNum -= 2;
+ if (!tryToAdaptParameters()) {
+ maxMemTableNum -= 2 * diff;
throw new ConfigAdjusterException(
"The IoTDB system load is too large to create storage group.");
}
@@ -170,35 +166,51 @@ public class IoTDBConfigDynamicAdjuster implements IDynamicAdjuster {
@Override
public void addOrDeleteTimeSeries(int diff) throws ConfigAdjusterException {
- totalTimeseriesNum += diff;
+ totalTimeseries += diff;
staticMemory += diff * TIMESERIES_METADATA_SIZE_B;
- if (!tryToAdjustParameters()) {
- totalTimeseriesNum -= diff;
+ if (!tryToAdaptParameters()) {
+ totalTimeseries -= diff;
staticMemory -= diff * TIMESERIES_METADATA_SIZE_B;
throw new ConfigAdjusterException("The IoTDB system load is too large to add timeseries.");
}
}
+ public void setInitialized(boolean initialized) {
+ this.initialized = initialized;
+ }
+
public int getCurrentMemTableSize() {
return currentMemTableSize;
}
- private IoTDBConfigDynamicAdjuster() {
+ public int getTotalTimeseries() {
+ return totalTimeseries;
+ }
+
+ /**
+ * Only for test
+ */
+ public void reset() {
+ totalTimeseries = 0;
+ staticMemory = 0;
+ maxMemTableNum = MEM_TABLE_AVERAGE_QUEUE_LEN;
+ }
+
+ private IoTDBConfigDynamicAdapter() {
init();
}
- public static IoTDBConfigDynamicAdjuster getInstance() {
- return IoTDBConfigAdjusterHolder.INSTANCE;
+ public static IoTDBConfigDynamicAdapter getInstance() {
+ return IoTDBConfigAdapterHolder.INSTANCE;
}
- private static class IoTDBConfigAdjusterHolder {
+ private static class IoTDBConfigAdapterHolder {
- private static final IoTDBConfigDynamicAdjuster INSTANCE = new IoTDBConfigDynamicAdjuster();
+ private static final IoTDBConfigDynamicAdapter INSTANCE = new IoTDBConfigDynamicAdapter();
- private IoTDBConfigAdjusterHolder() {
+ private IoTDBConfigAdapterHolder() {
}
}
-
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d27b0a3..7c94a4e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -189,6 +189,13 @@ public class StorageEngine implements IService {
}
}
+ public void asyncFlushAllProcessor() {
+ synchronized (processorMap) {
+ for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+ storageGroupProcessor.asyncFlush();
+ }
+ }
+ }
/**
* flush command
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
index 07624f2..af5a699 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/ChunkBufferPool.java
@@ -54,7 +54,7 @@ public class ChunkBufferPool {
//we use the memtable number * maximal series number in one StroageGroup * 2 as the capacity
int capacity =
2 * MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
- .getInstance().getConfig().getMemtableNumber() + 100000;
+ .getInstance().getConfig().getMaxMemtableNumber() + 100000;
if (availableChunkBuffer.isEmpty() && size < capacity) {
size++;
return new ChunkBuffer(schema);
@@ -93,7 +93,7 @@ public class ChunkBufferPool {
//we use the memtable number * maximal series number in one StroageGroup as the capacity
int capacity =
MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups() * IoTDBDescriptor
- .getInstance().getConfig().getMemtableNumber();
+ .getInstance().getConfig().getMaxMemtableNumber();
if (size > capacity) {
size --;
} else {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2cf59c9..30e8579 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -43,8 +43,8 @@ import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.ProcessorException;
+import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
*
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed().
- * (up to now, only flush command from cli will call this method)<br/>
+ * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
+ * cli will call this method)<br/>
*
* UnSequence data has the similar process as above.
*
@@ -114,9 +114,9 @@ public class StorageGroupProcessor {
private TsFileProcessor workUnSequenceTsFileProcessor = null;
private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/**
- * device -> global latest timestamp of each device
- * latestTimeForEachDevice caches non-flushed changes upon timestamps of each device, and is used
- * to update latestFlushedTimeForEachDevice when a flush is issued.
+ * device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
+ * changes upon timestamps of each device, and is used to update latestFlushedTimeForEachDevice
+ * when a flush is issued.
*/
private Map<String, Long> latestTimeForEachDevice = new HashMap<>();
/**
@@ -128,22 +128,22 @@ public class StorageGroupProcessor {
private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
private String storageGroupName;
/**
- * versionController assigns a version for each MemTable and deletion/update such that after
- * they are persisted, the order of insertions, deletions and updates can be re-determined.
+ * versionController assigns a version for each MemTable and deletion/update such that after they
+ * are persisted, the order of insertions, deletions and updates can be re-determined.
*/
private VersionController versionController;
/**
* mergeDeleteLock is to be used in the merge process. Concurrent deletion and merge may result in
- * losing some deletion in the merged new file, so a lock is necessary.
- * TODO reconsidering this when implementing the merge process.
+ * losing some deletion in the merged new file, so a lock is necessary. TODO reconsidering this
+ * when implementing the merge process.
*/
@SuppressWarnings("unused") // to be used in merge
private ReentrantLock mergeDeleteLock = new ReentrantLock();
/**
- * This is the modification file of the result of the current merge. Because the merged file
- * may be invisible at this moment, without this, deletion/update during merge could be lost.
+ * This is the modification file of the result of the current merge. Because the merged file may
+ * be invisible at this moment, without this, deletion/update during merge could be lost.
*/
private ModificationFile mergingModification;
@@ -455,6 +455,28 @@ public class StorageGroupProcessor {
}
}
+ public void asyncFlush() {
+ writeLock();
+ try {
+ if (workSequenceTsFileProcessor.shouldFlush()) {
+ logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+ workSequenceTsFileProcessor.getWorkMemTableMemory(),
+ workSequenceTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+ workSequenceTsFileProcessor.asyncFlush();
+ }
+ if (workUnSequenceTsFileProcessor.shouldFlush()) {
+ logger.info("The memtable size {} reaches the threshold, async flush it to tsfile: {}",
+ workUnSequenceTsFileProcessor.getWorkMemTableMemory(),
+ workUnSequenceTsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
+
+ workUnSequenceTsFileProcessor.asyncFlush();
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
private void writeLock() {
insertLock.writeLock().lock();
}
@@ -489,8 +511,8 @@ public class StorageGroupProcessor {
// left: in-memory data, right: meta of disk data
Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair;
pair = tsFileResource
- .getUnsealedFileProcessor()
- .query(deviceId, measurementId, dataType, mSchema.getProps(), context);
+ .getUnsealedFileProcessor()
+ .query(deviceId, measurementId, dataType, mSchema.getProps(), context);
tsfileResourcesForQuery
.add(new TsFileResource(tsFileResource.getFile(),
tsFileResource.getStartTimeMap(),
@@ -506,7 +528,8 @@ public class StorageGroupProcessor {
/**
- * Delete data whose timestamp <= 'timestamp' and belongs to the timeseries deviceId.measurementId.
+ * Delete data whose timestamp <= 'timestamp' and belongs to the timeseries
+ * deviceId.measurementId.
*
* @param deviceId the deviceId of the timeseries to be deleted.
* @param measurementId the measurementId of the timeseries to be deleted.
@@ -639,6 +662,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface CloseTsFileCallBack {
+
void call(TsFileProcessor caller) throws TsFileProcessorException, IOException;
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index d918e75..5a64231 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -30,12 +30,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.CompressionRatio;
import org.apache.iotdb.db.engine.StorageEngine;
-import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.MemSeriesLazyMerger;
import org.apache.iotdb.db.engine.memtable.MemTableFlushTask;
-import org.apache.iotdb.db.rescon.MemTablePool;
+import org.apache.iotdb.db.engine.memtable.NotifyFlushMemTable;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -46,6 +46,7 @@ import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.qp.constant.DatetimeUtils;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTablePool;
import org.apache.iotdb.db.utils.QueryUtils;
import org.apache.iotdb.db.writelog.manager.MultiFileLogNodeManager;
import org.apache.iotdb.db.writelog.node.WriteLogNode;
@@ -78,9 +79,8 @@ public class TsFileProcessor {
private ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
/**
- * It is set by the StorageGroupProcessor and checked by flush threads.
- * (If shouldClose == true and its flushingMemTables are all flushed, then the flush thread will
- * close this file.)
+ * It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
+ * and its flushingMemTables are all flushed, then the flush thread will close this file.)
*/
private volatile boolean shouldClose;
@@ -108,6 +108,8 @@ public class TsFileProcessor {
private boolean sequence;
+ private int totalFlushTimes;
+
TsFileProcessor(String storageGroupName, File tsfile, FileSchema fileSchema,
VersionController versionController,
CloseTsFileCallBack closeTsFileCallback,
@@ -253,6 +255,9 @@ public class TsFileProcessor {
} catch (IOException e) {
logger.error("async close failed, because", e);
}
+ CompressionRatio.getInstance().updateRatio(totalFlushTimes);
+ } catch (IOException e) {
+ logger.error("update compression ratio failed", e);
} finally {
flushQueryLock.writeLock().unlock();
}
@@ -274,7 +279,6 @@ public class TsFileProcessor {
flushQueryLock.writeLock().unlock();
}
-
synchronized (tmpMemTable) {
try {
long startWait = System.currentTimeMillis();
@@ -334,6 +338,9 @@ public class TsFileProcessor {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
getLogNode().notifyStartFlush();
}
+ if(!tobeFlushed.isSignalMemTable()){
+ totalFlushTimes++;
+ }
workMemTable = null;
FlushManager.getInstance().registerTsFileProcessor(this);
}
@@ -498,7 +505,8 @@ public class TsFileProcessor {
if (flushingMemTable.isSignalMemTable()) {
continue;
}
- ReadOnlyMemChunk memChunk = flushingMemTable.query(deviceId, measurementId, dataType, props);
+ ReadOnlyMemChunk memChunk = flushingMemTable
+ .query(deviceId, measurementId, dataType, props);
if (memChunk != null) {
memSeriesLazyMerger.addMemSeries(memChunk);
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/exception/ConfigAdjusterException.java b/iotdb/src/main/java/org/apache/iotdb/db/exception/ConfigAdjusterException.java
index b66dd69..8cfe11c 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/exception/ConfigAdjusterException.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/exception/ConfigAdjusterException.java
@@ -16,23 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.iotdb.db.exception;
-public class FlushRunTimeException extends RuntimeException {
+public class ConfigAdjusterException extends Exception {
- public FlushRunTimeException() {
+ public ConfigAdjusterException() {
}
- public FlushRunTimeException(String message) {
+ public ConfigAdjusterException(String message) {
super(message);
}
- public FlushRunTimeException(String message, Throwable cause) {
+ public ConfigAdjusterException(String message, Throwable cause) {
super(message, cause);
}
- public FlushRunTimeException(Throwable cause) {
+ public ConfigAdjusterException(Throwable cause) {
super(cause);
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index 0ae3434..5e44280 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -33,7 +33,9 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.ConfigAdjusterException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.monitor.MonitorConstants;
@@ -77,7 +79,8 @@ public class MManager {
private MManager() {
- schemaDir = IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
+ schemaDir =
+ IoTDBDescriptor.getInstance().getConfig().getSystemDir() + File.separator + "schema";
File systemFolder = new File(schemaDir);
if (!systemFolder.exists()) {
@@ -116,6 +119,7 @@ public class MManager {
};
init();
+ IoTDBConfigDynamicAdapter.getInstance().setInitialized(true);
initialized = true;
}
@@ -130,7 +134,6 @@ public class MManager {
lock.writeLock().lock();
File logFile = new File(logFilePath);
-
try {
initFromLog(logFile);
seriesNumberInStorageGroups = mgraph.countSeriesNumberInEachStorageGroup();
@@ -155,7 +158,7 @@ public class MManager {
// init the metadata from the operation log
mgraph = new MGraph(ROOT_NAME);
if (logFile.exists()) {
- try( FileReader fr = new FileReader(logFile);
+ try (FileReader fr = new FileReader(logFile);
BufferedReader br = new BufferedReader(fr)) {
String cmd;
while ((cmd = br.readLine()) != null) {
@@ -261,8 +264,8 @@ public class MManager {
* @param dataType the datetype {@code DataType} for the timeseries
* @param encoding the encoding function {@code Encoding} for the timeseries
* @param compressor the compressor function {@code Compressor} for the time series
- * @return whether the measurement occurs for the first time in this storage group (if true,
- * the measurement should be registered to the StorageEngine too)
+ * @return whether the measurement occurs for the first time in this storage group (if true, the
+ * measurement should be registered to the StorageEngine too)
*/
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public boolean addPathToMTree(Path path, TSDataType dataType, TSEncoding encoding,
@@ -282,6 +285,11 @@ public class MManager {
} catch (PathErrorException e) {
throw new MetadataErrorException(e);
}
+ try {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
+ } catch (ConfigAdjusterException e) {
+ throw new MetadataErrorException(e);
+ }
// the two map is stored in the storage group node
Map<String, MeasurementSchema> schemaMap = getStorageGroupSchemaMap(fileNodePath);
Map<String, Integer> numSchemaMap = getStorageGroupNumSchemaMap(fileNodePath);
@@ -327,7 +335,6 @@ public class MManager {
CompressionType compressor, Map<String, String> props)
throws PathErrorException, IOException {
-
lock.writeLock().lock();
try {
mgraph.addPathToMTree(path, dataType, encoding, compressor, props);
@@ -411,10 +418,10 @@ public class MManager {
/**
* delete given paths from metadata.
+ *
* @param deletePathList list of paths to be deleted
- * @return a set contains StorageGroups that contain no more timeseries
- * after this deletion and files of such StorageGroups should be deleted to reclaim disk space.
- * @throws MetadataErrorException
+ * @return a set contains StorageGroups that contain no more timeseries after this deletion and
+ * files of such StorageGroups should be deleted to reclaim disk space.
*/
public Set<String> deletePaths(List<Path> deletePathList)
throws MetadataErrorException {
@@ -423,6 +430,11 @@ public class MManager {
Set<String> emptyStorageGroups = new HashSet<>();
for (String p : fullPath) {
+ try {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(-1);
+ } catch (ConfigAdjusterException e) {
+ throw new MetadataErrorException(e);
+ }
String emptiedStorageGroup = deletePath(p);
if (emptiedStorageGroup != null) {
emptyStorageGroups.add(emptiedStorageGroup);
@@ -507,19 +519,15 @@ public class MManager {
*/
public void setStorageLevelToMTree(String path) throws MetadataErrorException {
if (initialized && StorageEngine.getInstance().isReadOnly()) {
- throw new MetadataErrorException("Current system mode is read only, does not support creating Storage Group");
+ throw new MetadataErrorException(
+ "Current system mode is read only, does not support creating Storage Group");
}
lock.writeLock().lock();
try {
checkAndGetDataTypeCache.clear();
mNodeCache.clear();
- // if (current storage groups + the new storage group + the statistic storage group) * 2 > total memtable number
- if ((seriesNumberInStorageGroups.size() + 2) * 2 > IoTDBDescriptor.getInstance().getConfig()
- .getMemtableNumber()) {
- throw new PathErrorException(
- "too many storage groups, please increase the number of memtable");
- }
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
mgraph.setStorageLevel(path);
seriesNumberInStorageGroups.put(path, 0);
if (writeToLog) {
@@ -528,15 +536,23 @@ public class MManager {
writer.newLine();
writer.flush();
}
- } catch (IOException | PathErrorException e) {
+ } catch (IOException | ConfigAdjusterException e) {
+ throw new MetadataErrorException(e);
+ } catch (PathErrorException e) {
+ try {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(-1);
+ } catch (ConfigAdjusterException ex) {
+ throw new MetadataErrorException(ex);
+ }
throw new MetadataErrorException(e);
- } finally{
+ } finally {
lock.writeLock().unlock();
}
}
/**
* function for checking if the given path is storage level of mTree or not.
+ *
* @apiNote :for cluster
*/
boolean checkStorageLevelOfMTree(String path) {
@@ -747,10 +763,9 @@ public class MManager {
}
/**
- * @deprecated Get all MeasurementSchemas for given delta object type.
- *
* @param path A seriesPath represented one Delta object
* @return a list contains all column schema
+ * @deprecated Get all MeasurementSchemas for given delta object type.
*/
@Deprecated
public List<MeasurementSchema> getSchemaForOneType(String path) throws PathErrorException {
@@ -1016,8 +1031,8 @@ public class MManager {
}
/**
- * Get MeasurementSchema for given seriesPath. Notice: Path must be a complete Path from root to leaf
- * node.
+ * Get MeasurementSchema for given seriesPath. Notice: Path must be a complete Path from root to
+ * leaf node.
*/
private MeasurementSchema getSchemaForOnePath(String path) throws PathErrorException {
@@ -1170,14 +1185,23 @@ public class MManager {
}
}
+ /**
+ * Only for test
+ */
+ public void setMaxSeriesNumberAmongStorageGroup(int maxSeriesNumberAmongStorageGroup) {
+ this.maxSeriesNumberAmongStorageGroup = maxSeriesNumberAmongStorageGroup;
+ }
+
public int getMaximalSeriesNumberAmongStorageGroups() {
return maxSeriesNumberAmongStorageGroup;
}
private static class MManagerHolder {
- private MManagerHolder(){
+
+ private MManagerHolder() {
//allowed to do nothing
}
+
private static final MManager INSTANCE = new MManager();
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
index 9a3021b..41b09d1 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/qp/executor/QueryProcessExecutor.java
@@ -33,7 +33,9 @@ import org.apache.iotdb.db.auth.authorizer.LocalFileAuthorizer;
import org.apache.iotdb.db.auth.entity.PathPrivilege;
import org.apache.iotdb.db.auth.entity.Role;
import org.apache.iotdb.db.auth.entity.User;
+import org.apache.iotdb.db.conf.adapter.IoTDBConfigDynamicAdapter;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.ConfigAdjusterException;
import org.apache.iotdb.db.exception.MetadataErrorException;
import org.apache.iotdb.db.exception.PathErrorException;
import org.apache.iotdb.db.exception.ProcessorException;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
index 0a2d21c..d3e6fda 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/rescon/MemTablePool.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.rescon;
import java.util.ArrayDeque;
import java.util.Deque;
+import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.memtable.IMemTable;
import org.apache.iotdb.db.engine.memtable.PrimitiveMemTable;
@@ -28,16 +29,12 @@ import org.slf4j.LoggerFactory;
public class MemTablePool {
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
private static final Logger logger = LoggerFactory.getLogger(MemTablePool.class);
private static final Deque<IMemTable> availableMemTables = new ArrayDeque<>();
- /**
- * >= number of storage group * 2
- * TODO check this parameter to ensure that capaity * MaxMemTable Size < JVM memory / 2
- */
- private static int capacity = IoTDBDescriptor.getInstance().getConfig().getMemtableNumber();
-
private int size = 0;
private static final int WAIT_TIME = 2000;
@@ -47,7 +44,7 @@ public class MemTablePool {
public IMemTable getAvailableMemTable(Object applier) {
synchronized (availableMemTables) {
- if (availableMemTables.isEmpty() && size < capacity) {
+ if (availableMemTables.isEmpty() && size < CONFIG.getMaxMemtableNumber()) {
size++;
logger.info("generated a new memtable for {}, system memtable size: {}, stack size: {}",
applier, size, availableMemTables.size());
@@ -84,6 +81,14 @@ public class MemTablePool {
return;
}
synchronized (availableMemTables) {
+ // because of dynamic parameter adjust, the max number of memtable may decrease.
+ if (size > CONFIG.getMaxMemtableNumber()) {
+ logger.debug(
+ "Currently the size of available MemTables is {}, the maxmin size of MemTables is {}, discard this MemTable.",
+ CONFIG.getMaxMemtableNumber(), size);
+ size--;
+ return;
+ }
memTable.clear();
availableMemTables.push(memTable);
availableMemTables.notify();
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapterTest.java b/iotdb/src/test/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapterTest.java
index b9a06de..0373e71 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapterTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/conf/adapter/IoTDBConfigDynamicAdapterTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.conf.adjuster;
+package org.apache.iotdb.db.conf.adapter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -38,116 +38,92 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-public class IoTDBConfigDynamicAdjusterTest {
+public class IoTDBConfigDynamicAdapterTest {
+
private static IoTDB daemon;
+ private static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+ private long oldTsFileThreshold = CONFIG.getTsFileSizeThreshold();
+
+ private int oldMaxMemTableNumber = CONFIG.getMaxMemtableNumber();
+
+ private int oldGroupSizeInByte = TSFileConfig.groupSizeInByte;
+
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
daemon = IoTDB.getInstance();
daemon.active();
EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
}
@After
public void tearDown() throws Exception {
daemon.stop();
EnvironmentUtils.cleanEnv();
+ CONFIG.setMaxMemtableNumber(oldMaxMemTableNumber);
+ CONFIG.setTsFileSizeThreshold(oldTsFileThreshold);
+ TSFileConfig.groupSizeInByte = oldGroupSizeInByte;
+ MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(0);
+ IoTDBConfigDynamicAdapter.getInstance().reset();
}
@Test
- public void addOrDeleteStorageGroup() {
+ public void addOrDeleteStorageGroup() throws ConfigAdjusterException {
System.out.println(
"System total memory : " + Runtime.getRuntime().maxMemory() / IoTDBConstant.MB
+ "MB");
- IoTDBConfigDynamicAdjuster.getInstance();
- MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(10000);
- try {
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteTimeSeries(10000);
- } catch (ConfigAdjusterException e) {
- fail();
- }
- System.out.println("MemTable size floor threshold is :"
- + MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
- * PrimitiveArrayPool.ARRAY_SIZE * Long.BYTES * 2 / IoTDBConstant.MB + "MB");
+ int memTableNum = IoTDBConfigDynamicAdapter.MEM_TABLE_AVERAGE_QUEUE_LEN;
for (int i = 1; i < 100; i++) {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
+ }
+ MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(100);
+ for (int i = 1; i < 1000000; i++) {
try {
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteStorageGroup(1);
- assertEquals(IoTDBConfigDynamicAdjuster.getInstance().getCurrentMemTableSize(),
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
+ memTableNum += 2;
+ assertEquals(IoTDBConfigDynamicAdapter.getInstance().getCurrentMemTableSize(),
TSFileConfig.groupSizeInByte);
- System.out.println(String.format("add %d storage groups, the memTableSize is %dMB, the tsFileSize is %dMB", i,
- TSFileConfig.groupSizeInByte / IoTDBConstant.MB,
- IoTDBDescriptor.getInstance().getConfig().getTsFileSizeThreshold() / IoTDBConstant.MB));
+ assertEquals(CONFIG.getMaxMemtableNumber(), memTableNum);
} catch (ConfigAdjusterException e) {
- fail();
+ assertEquals("The IoTDB system load is too large to create storage group.", e.getMessage());
+ System.out.println("it has created " + i + " storage groups.");
+ assertEquals(CONFIG.getMaxMemtableNumber(), memTableNum);
+ break;
}
}
}
@Test
- public void addOrDeleteTimeSeries() {
+ public void addOrDeleteTimeSeries() throws ConfigAdjusterException {
System.out.println(
"System total memory : " + Runtime.getRuntime().maxMemory() / IoTDBConstant.MB
+ "MB");
- try {
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteStorageGroup(1);
- for(int i = 1; i <= 100000 ; i++) {
- if(i ==27780){
- System.out.println(i);
- }
- System.out.println("MemTable size floor threshold is :"
- + MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
- * PrimitiveArrayPool.ARRAY_SIZE * Long.BYTES * 2/ IoTDBConstant.MB + "MB");
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteTimeSeries(1);
- MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(i);
- assertEquals(IoTDBConfigDynamicAdjuster.getInstance().getCurrentMemTableSize(),
- TSFileConfig.groupSizeInByte);
- System.out.println(String
- .format("add %d timeseries, the memTableSize is %dMB, the tsFileSize is %dMB", i,
- TSFileConfig.groupSizeInByte / IoTDBConstant.MB,
- IoTDBDescriptor.getInstance().getConfig().getTsFileSizeThreshold()
- / IoTDBConstant.MB));
- }
- } catch (ConfigAdjusterException e) {
- fail();
+ int totalTimeseries = 0;
+ for (int i = 1; i < 100; i++) {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteStorageGroup(1);
}
- }
+ MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(100);
+ for (int i = 1; i < 1000000; i++) {
+ try {
+ IoTDBConfigDynamicAdapter.getInstance().addOrDeleteTimeSeries(1);
- @Test
- public void addOrDeleteTimeSeriesSyso() throws IOException {
- int sgNum = 1;
- String fileName = "/Users/litianan/Desktop/" + sgNum + "sg.csv";
- FileWriter fw = new FileWriter(new File(fileName));
- fw.write("timeseries,memtable size(MB), memtable threshold(MB), tsfile size(MB)\n");
- System.out.println(
- "System total memory : " + Runtime.getRuntime().maxMemory() / IoTDBConstant.MB
- + "MB");
- try {
- for(int i = 1; i <= 100000 ; i++) {
- if(i % 100 == 0){
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteStorageGroup(sgNum);
+ if (i % 10 == 0) {
+ MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(i);
}
- fw.write(String.format("%d,%d,%d,%d\n", i, TSFileConfig.groupSizeInByte / IoTDBConstant.MB, MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
- * PrimitiveArrayPool.ARRAY_SIZE * Long.BYTES * 2/ IoTDBConstant.MB, IoTDBDescriptor.getInstance().getConfig().getTsFileSizeThreshold()
- / IoTDBConstant.MB));
- System.out.println("MemTable size floor threshold is :"
- + MManager.getInstance().getMaximalSeriesNumberAmongStorageGroups()
- * PrimitiveArrayPool.ARRAY_SIZE * Long.BYTES * 2/ IoTDBConstant.MB + "MB");
- IoTDBConfigDynamicAdjuster.getInstance().addOrDeleteTimeSeries(1);
- MManager.getInstance().setMaxSeriesNumberAmongStorageGroup(i);
- assertEquals(IoTDBConfigDynamicAdjuster.getInstance().getCurrentMemTableSize(),
+ totalTimeseries += 1;
+ assertEquals(IoTDBConfigDynamicAdapter.getInstance().getCurrentMemTableSize(),
TSFileConfig.groupSizeInByte);
- System.out.println(String
- .format("add %d timeseries, the memTableSize is %dMB, the tsFileSize is %dMB", i,
- TSFileConfig.groupSizeInByte / IoTDBConstant.MB,
- IoTDBDescriptor.getInstance().getConfig().getTsFileSizeThreshold()
- / IoTDBConstant.MB));
+ assertEquals(IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries(),
+ totalTimeseries);
+ } catch (ConfigAdjusterException e) {
+ assertEquals("The IoTDB system load is too large to add timeseries.", e.getMessage());
+ System.out.println("it has added " + i + " timeseries.");
+ assertEquals(IoTDBConfigDynamicAdapter.getInstance().getTotalTimeseries(),
+ totalTimeseries);
+ break;
}
- } catch (ConfigAdjusterException e) {
-// fail();
}
- fw.flush();
- fw.close();
}
}
\ No newline at end of file
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 1ac08e0..f3b6881 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@ -178,5 +178,4 @@ public class RestorableTsFileIOWriter extends TsFileIOWriter {
return append;
}
-
}