You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2021/08/20 01:59:48 UTC
[iotdb] branch rel/0.12 updated: [IOTDB-1496][IOTDB-1569][To
rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new b8e3d84 [IOTDB-1496][IOTDB-1569][To rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)
b8e3d84 is described below
commit b8e3d84535115a5aee853550d1a1b87ede9d46af
Author: Alan Choo <43...@users.noreply.github.com>
AuthorDate: Fri Aug 20 09:59:18 2021 +0800
[IOTDB-1496][IOTDB-1569][To rel/0.12] Timed flush memtable & Timed close TsFileProcessor (#3777)
---
RELEASE_NOTES.md | 2 +
.../resources/conf/iotdb-engine.properties | 44 +++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 108 +++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 94 +++++++++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 154 +++++++++++++++---
.../iotdb/db/engine/memtable/AbstractMemTable.java | 7 +
.../apache/iotdb/db/engine/memtable/IMemTable.java | 2 +
.../engine/storagegroup/StorageGroupProcessor.java | 86 ++++++++++
.../db/engine/storagegroup/TsFileProcessor.java | 13 ++
.../virtualSg/VirtualStorageGroupManager.java | 27 ++++
.../apache/iotdb/db/rescon/MemTableManager.java | 4 +
.../storagegroup/StorageGroupProcessorTest.java | 178 +++++++++++++++++++--
.../apache/iotdb/db/utils/EnvironmentUtils.java | 4 +
13 files changed, 695 insertions(+), 28 deletions(-)
diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md
index 29d5a6b..1ad2908 100644
--- a/RELEASE_NOTES.md
+++ b/RELEASE_NOTES.md
@@ -28,8 +28,10 @@
* [IOTDB-1466] Support device template
* [IOTDB-1491] UDTF query supported in cluster
* [IOTDB-1536] Support fuzzy query
+* [IOTDB-1496] Timed flush memtable
* [IOTDB-1561] Support fill by specific value
* [IOTDB-1565] Add sql: set system to readonly/writable
+* [IOTDB-1569] Timed close TsFileProcessor
* TTL can be set to the prefix path of storage group
* add JMX monitor to all ThreadPools in the server module
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 3f7178e..ea4d4e5 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -199,6 +199,50 @@ timestamp_precision=ms
# When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. The default threshold is 1 GB.
# memtable_size_threshold=1073741824
+# Whether to timed flush sequence tsfiles' memtables.
+# Datatype: boolean
+# enable_timed_flush_seq_memtable=false
+
+# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
+# Only check sequence tsfiles' memtables.
+# The default flush interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# seq_memtable_flush_interval_in_ms=43200000
+
+# The interval to check whether sequence memtables need flushing.
+# The default flush check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# seq_memtable_flush_check_interval_in_ms=3600000
+
+# Whether to timed flush unsequence tsfiles' memtables.
+# Datatype: boolean
+# enable_timed_flush_unseq_memtable=true
+
+# If a memTable's created time is older than current time minus this, the memtable will be flushed to disk.
+# Only check unsequence tsfiles' memtables.
+# The default flush interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# unseq_memtable_flush_interval_in_ms=43200000
+
+# The interval to check whether unsequence memtables need flushing.
+# The default flush check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# unseq_memtable_flush_check_interval_in_ms=3600000
+
+# Whether to timed close tsfiles.
+# Datatype: boolean
+# enable_timed_close_tsfile=true
+
+# If a TsfileProcessor's last working memtable flush time is older than current time minus this and its working memtable is null, the TsfileProcessor will be closed.
+# The default close interval is 12 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# close_tsfile_interval_after_flushing_in_ms=43200000
+
+# The interval to check whether tsfiles need closing.
+# The default close check interval is 1 * 60 * 60 * 1000. (unit: ms)
+# Datatype: long
+# close_tsfile_check_interval_in_ms=3600000
+
# When the average point number of timeseries in memtable exceeds this, the memtable is flushed to disk. The default threshold is 10000.
# avg_series_point_number_threshold=10000
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4ffd446..aa63643 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -274,6 +274,42 @@ public class IoTDBConfig {
/** When a memTable's size (in byte) exceeds this, the memtable is flushed to disk. */
private long memtableSizeThreshold = 1024 * 1024 * 1024L;
+ /** Whether to timed flush sequence tsfiles' memtables. */
+ private boolean enableTimedFlushSeqMemtable = false;
+
+ /**
+ * If a memTable's created time is older than current time minus this, the memtable will be
+ * flushed to disk.(only check sequence tsfiles' memtables) Unit: ms
+ */
+ private long seqMemtableFlushInterval = 12 * 60 * 60 * 1000L;
+
+ /** The interval to check whether sequence memtables need flushing. Unit: ms */
+ private long seqMemtableFlushCheckInterval = 60 * 60 * 1000L;
+
+ /** Whether to timed flush unsequence tsfiles' memtables. */
+ private boolean enableTimedFlushUnseqMemtable = true;
+
+ /**
+ * If a memTable's created time is older than current time minus this, the memtable will be
+ * flushed to disk.(only check unsequence tsfiles' memtables) Unit: ms
+ */
+ private long unseqMemtableFlushInterval = 12 * 60 * 60 * 1000L;
+
+ /** The interval to check whether unsequence memtables need flushing. Unit: ms */
+ private long unseqMemtableFlushCheckInterval = 60 * 60 * 1000L;
+
+ /** Whether to timed close tsfiles. */
+ private boolean enableTimedCloseTsFile = true;
+
+ /**
+ * If a TsfileProcessor's last working memtable flush time is older than current time minus this
+ * and its working memtable is null, the TsfileProcessor will be closed. Unit: ms
+ */
+ private long closeTsFileIntervalAfterFlushing = 12 * 60 * 60 * 1000L;
+
+ /** The interval to check whether tsfiles need closing. Unit: ms */
+ private long closeTsFileCheckInterval = 60 * 60 * 1000L;
+
/** When average series point number reaches this, flush the memtable to disk */
private int avgSeriesPointNumberThreshold = 10000;
@@ -1403,6 +1439,78 @@ public class IoTDBConfig {
this.memtableSizeThreshold = memtableSizeThreshold;
}
+ public boolean isEnableTimedFlushSeqMemtable() {
+ return enableTimedFlushSeqMemtable;
+ }
+
+ public void setEnableTimedFlushSeqMemtable(boolean enableTimedFlushSeqMemtable) {
+ this.enableTimedFlushSeqMemtable = enableTimedFlushSeqMemtable;
+ }
+
+ public long getSeqMemtableFlushInterval() {
+ return seqMemtableFlushInterval;
+ }
+
+ public void setSeqMemtableFlushInterval(long seqMemtableFlushInterval) {
+ this.seqMemtableFlushInterval = seqMemtableFlushInterval;
+ }
+
+ public long getSeqMemtableFlushCheckInterval() {
+ return seqMemtableFlushCheckInterval;
+ }
+
+ public void setSeqMemtableFlushCheckInterval(long seqMemtableFlushCheckInterval) {
+ this.seqMemtableFlushCheckInterval = seqMemtableFlushCheckInterval;
+ }
+
+ public boolean isEnableTimedFlushUnseqMemtable() {
+ return enableTimedFlushUnseqMemtable;
+ }
+
+ public void setEnableTimedFlushUnseqMemtable(boolean enableTimedFlushUnseqMemtable) {
+ this.enableTimedFlushUnseqMemtable = enableTimedFlushUnseqMemtable;
+ }
+
+ public long getUnseqMemtableFlushInterval() {
+ return unseqMemtableFlushInterval;
+ }
+
+ public void setUnseqMemtableFlushInterval(long unseqMemtableFlushInterval) {
+ this.unseqMemtableFlushInterval = unseqMemtableFlushInterval;
+ }
+
+ public long getUnseqMemtableFlushCheckInterval() {
+ return unseqMemtableFlushCheckInterval;
+ }
+
+ public void setUnseqMemtableFlushCheckInterval(long unseqMemtableFlushCheckInterval) {
+ this.unseqMemtableFlushCheckInterval = unseqMemtableFlushCheckInterval;
+ }
+
+ public boolean isEnableTimedCloseTsFile() {
+ return enableTimedCloseTsFile;
+ }
+
+ public void setEnableTimedCloseTsFile(boolean enableTimedCloseTsFile) {
+ this.enableTimedCloseTsFile = enableTimedCloseTsFile;
+ }
+
+ public long getCloseTsFileIntervalAfterFlushing() {
+ return closeTsFileIntervalAfterFlushing;
+ }
+
+ public void setCloseTsFileIntervalAfterFlushing(long closeTsFileIntervalAfterFlushing) {
+ this.closeTsFileIntervalAfterFlushing = closeTsFileIntervalAfterFlushing;
+ }
+
+ public long getCloseTsFileCheckInterval() {
+ return closeTsFileCheckInterval;
+ }
+
+ public void setCloseTsFileCheckInterval(long closeTsFileCheckInterval) {
+ this.closeTsFileCheckInterval = closeTsFileCheckInterval;
+ }
+
public int getAvgSeriesPointNumberThreshold() {
return avgSeriesPointNumberThreshold;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 2bdb253..7696908 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.conf;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.utils.FilePathUtils;
@@ -756,6 +757,9 @@ public class IoTDBDescriptor {
properties.getProperty("kerberos_principal", conf.getKerberosPrincipal()));
TSFileDescriptor.getInstance().getConfig().setBatchSize(conf.getBatchSize());
+ // timed flush memtable, timed close tsfile
+ loadTimedService(properties);
+
// set tsfile-format config
loadTsFileProps(properties);
@@ -968,6 +972,92 @@ public class IoTDBDescriptor {
TSFileDescriptor.getInstance().getConfig().getMaxDegreeOfIndexNode()))));
}
+ // timed flush memtable, timed close tsfile
+ private void loadTimedService(Properties properties) {
+ conf.setEnableTimedFlushSeqMemtable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_timed_flush_seq_memtable",
+ Boolean.toString(conf.isEnableTimedFlushSeqMemtable()))));
+
+ long seqMemTableFlushInterval =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "seq_memtable_flush_interval_in_ms",
+ Long.toString(conf.getSeqMemtableFlushInterval()))
+ .trim());
+ if (seqMemTableFlushInterval > 0) {
+ conf.setSeqMemtableFlushInterval(seqMemTableFlushInterval);
+ }
+
+ long seqMemTableFlushCheckInterval =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "seq_memtable_flush_check_interval_in_ms",
+ Long.toString(conf.getSeqMemtableFlushCheckInterval()))
+ .trim());
+ if (seqMemTableFlushCheckInterval > 0) {
+ conf.setSeqMemtableFlushCheckInterval(seqMemTableFlushCheckInterval);
+ }
+
+ conf.setEnableTimedFlushUnseqMemtable(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_timed_flush_unseq_memtable",
+ Boolean.toString(conf.isEnableTimedFlushUnseqMemtable()))));
+
+ long unseqMemTableFlushInterval =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "unseq_memtable_flush_interval_in_ms",
+ Long.toString(conf.getUnseqMemtableFlushInterval()))
+ .trim());
+ if (unseqMemTableFlushInterval > 0) {
+ conf.setUnseqMemtableFlushInterval(unseqMemTableFlushInterval);
+ }
+
+ long unseqMemTableFlushCheckInterval =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "unseq_memtable_flush_check_interval_in_ms",
+ Long.toString(conf.getUnseqMemtableFlushCheckInterval()))
+ .trim());
+ if (unseqMemTableFlushCheckInterval > 0) {
+ conf.setUnseqMemtableFlushCheckInterval(unseqMemTableFlushCheckInterval);
+ }
+
+ conf.setEnableTimedCloseTsFile(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "enable_timed_close_tsfile", Boolean.toString(conf.isEnableTimedCloseTsFile()))));
+
+ long closeTsFileIntervalAfterFlushing =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "close_tsfile_interval_after_flushing_in_ms",
+ Long.toString(conf.getCloseTsFileIntervalAfterFlushing()))
+ .trim());
+ if (closeTsFileIntervalAfterFlushing > 0) {
+ conf.setCloseTsFileIntervalAfterFlushing(closeTsFileIntervalAfterFlushing);
+ }
+
+ long closeTsFileCheckInterval =
+ Long.parseLong(
+ properties
+ .getProperty(
+ "close_tsfile_check_interval_in_ms",
+ Long.toString(conf.getCloseTsFileCheckInterval()))
+ .trim());
+ if (closeTsFileCheckInterval > 0) {
+ conf.setCloseTsFileCheckInterval(closeTsFileCheckInterval);
+ }
+ }
+
public void loadHotModifiedProps(Properties properties) throws QueryProcessException {
try {
// update data dirs
@@ -987,6 +1077,10 @@ public class IoTDBDescriptor {
// update WAL conf
loadWALProps(properties);
+ // update timed flush & close conf
+ loadTimedService(properties);
+ StorageEngine.getInstance().rebootTimedService();
+
long seqTsFileSize =
Long.parseLong(
properties
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index eafaf0e..49750f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -99,6 +99,7 @@ public class StorageEngine implements IService {
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final long TTL_CHECK_INTERVAL = 60 * 1000L;
+
/**
* Time range for dividing storage group, the time unit is the same with IoTDB's
* TimestampPrecision
@@ -122,6 +123,10 @@ public class StorageEngine implements IService {
private ExecutorService recoverAllSgThreadPool;
private ScheduledExecutorService ttlCheckThread;
+ private ScheduledExecutorService seqMemtableTimedFlushCheckThread;
+ private ScheduledExecutorService unseqMemtableTimedFlushCheckThread;
+ private ScheduledExecutorService tsFileTimedCloseCheckThread;
+
private TsFileFlushPolicy fileFlushPolicy = new DirectFlushPolicy();
private ExecutorService recoveryThreadPool;
// add customized listeners here for flush and close events
@@ -295,6 +300,9 @@ public class StorageEngine implements IService {
ttlCheckThread = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TTL");
ttlCheckThread.scheduleAtFixedRate(
this::checkTTL, TTL_CHECK_INTERVAL, TTL_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+ logger.info("start ttl check thread successfully.");
+
+ startTimedService();
}
private void checkTTL() {
@@ -313,20 +321,79 @@ public class StorageEngine implements IService {
}
}
+ private void startTimedService() {
+ // timed flush sequence memtable
+ if (config.isEnableTimedFlushSeqMemtable()) {
+ seqMemtableTimedFlushCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedFlushSeqMemtable");
+ seqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
+ this::timedFlushSeqMemTable,
+ config.getSeqMemtableFlushCheckInterval(),
+ config.getSeqMemtableFlushCheckInterval(),
+ TimeUnit.MILLISECONDS);
+ logger.info("start sequence memtable timed flush check thread successfully.");
+ }
+ // timed flush unsequence memtable
+ if (config.isEnableTimedFlushUnseqMemtable()) {
+ unseqMemtableTimedFlushCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedFlushUnSeqMemtable");
+ unseqMemtableTimedFlushCheckThread.scheduleAtFixedRate(
+ this::timedFlushUnseqMemTable,
+ config.getUnseqMemtableFlushCheckInterval(),
+ config.getUnseqMemtableFlushCheckInterval(),
+ TimeUnit.MILLISECONDS);
+ logger.info("start unsequence memtable timed flush check thread successfully.");
+ }
+ // timed close tsfile
+ if (config.isEnableTimedCloseTsFile()) {
+ tsFileTimedCloseCheckThread =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("TimedCloseTsFileProcessor");
+ tsFileTimedCloseCheckThread.scheduleAtFixedRate(
+ this::timedCloseTsFileProcessor,
+ config.getCloseTsFileCheckInterval(),
+ config.getCloseTsFileCheckInterval(),
+ TimeUnit.MILLISECONDS);
+ logger.info("start tsfile timed close check thread successfully.");
+ }
+ }
+
+ private void timedFlushSeqMemTable() {
+ try {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
+ processor.timedFlushSeqMemTable();
+ }
+ } catch (Exception e) {
+ logger.error("An error occurred when timed flushing sequence memtables", e);
+ }
+ }
+
+ private void timedFlushUnseqMemTable() {
+ try {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
+ processor.timedFlushUnseqMemTable();
+ }
+ } catch (Exception e) {
+ logger.error("An error occurred when timed flushing unsequence memtables", e);
+ }
+ }
+
+ private void timedCloseTsFileProcessor() {
+ try {
+ for (VirtualStorageGroupManager processor : processorMap.values()) {
+ processor.timedCloseTsFileProcessor();
+ }
+ } catch (Exception e) {
+ logger.error("An error occurred when timed closing tsfiles interval", e);
+ }
+ }
+
@Override
public void stop() {
syncCloseAllProcessor();
- if (ttlCheckThread != null) {
- ttlCheckThread.shutdownNow();
- try {
- ttlCheckThread.awaitTermination(60, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- logger.warn("TTL check thread still doesn't exit after 60s");
- Thread.currentThread().interrupt();
- throw new StorageEngineFailureException(
- "StorageEngine failed to stop because of " + "ttlCheckThread.", e);
- }
- }
+ stopTimedService(ttlCheckThread, "TTlCheckThread");
+ stopTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+ stopTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+ stopTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
recoveryThreadPool.shutdownNow();
if (!recoverAllSgThreadPool.isShutdown()) {
recoverAllSgThreadPool.shutdownNow();
@@ -336,7 +403,7 @@ public class StorageEngine implements IService {
logger.warn("recoverAllSgThreadPool thread still doesn't exit after 60s");
Thread.currentThread().interrupt();
throw new StorageEngineFailureException(
- "StorageEngine failed to stop because of " + "recoverAllSgThreadPool.", e);
+ "StorageEngine failed to stop because of recoverAllSgThreadPool.", e);
}
}
for (PartialPath storageGroup : IoTDB.metaManager.getAllStorageGroupPaths()) {
@@ -345,6 +412,20 @@ public class StorageEngine implements IService {
this.reset();
}
+ private void stopTimedService(ScheduledExecutorService pool, String poolName) {
+ if (pool != null) {
+ pool.shutdownNow();
+ try {
+ pool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("{} still doesn't exit after 60s", poolName);
+ Thread.currentThread().interrupt();
+ throw new StorageEngineFailureException(
+ String.format("StorageEngine failed to stop because of %s.", poolName), e);
+ }
+ }
+ }
+
@Override
public void shutdown(long milliseconds) throws ShutdownException {
try {
@@ -352,17 +433,54 @@ public class StorageEngine implements IService {
} catch (TsFileProcessorException e) {
throw new ShutdownException(e);
}
- if (ttlCheckThread != null) {
- ttlCheckThread.shutdownNow();
+ shutdownTimedService(ttlCheckThread, "TTlCheckThread");
+ shutdownTimedService(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+ shutdownTimedService(unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+ shutdownTimedService(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+ recoveryThreadPool.shutdownNow();
+ this.reset();
+ }
+
+ private void shutdownTimedService(ScheduledExecutorService pool, String poolName) {
+ if (pool != null) {
+ pool.shutdownNow();
try {
- ttlCheckThread.awaitTermination(30, TimeUnit.SECONDS);
+ pool.awaitTermination(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- logger.warn("TTL check thread still doesn't exit after 30s");
+ logger.warn("{} still doesn't exit after 30s", poolName);
Thread.currentThread().interrupt();
}
}
- recoveryThreadPool.shutdownNow();
- this.reset();
+ }
+
+ /** reboot timed flush sequence/unsequence memetable thread, timed close tsfile thread */
+ public void rebootTimedService() throws ShutdownException {
+ logger.info("Start rebooting all timed service.");
+
+ // exclude ttl check thread
+ stopTimedServiceAndThrow(seqMemtableTimedFlushCheckThread, "SeqMemtableTimedFlushCheckThread");
+ stopTimedServiceAndThrow(
+ unseqMemtableTimedFlushCheckThread, "UnseqMemtableTimedFlushCheckThread");
+ stopTimedServiceAndThrow(tsFileTimedCloseCheckThread, "TsFileTimedCloseCheckThread");
+
+ logger.info("Stop all timed service successfully, and now restart them.");
+
+ startTimedService();
+
+ logger.info("Reboot all timed service successfully");
+ }
+
+ private void stopTimedServiceAndThrow(ScheduledExecutorService pool, String poolName)
+ throws ShutdownException {
+ if (pool != null) {
+ pool.shutdownNow();
+ try {
+ pool.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("{} still doesn't exit after 30s", poolName);
+ throw new ShutdownException(e);
+ }
+ }
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
index b49fc31..6bf79e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/AbstractMemTable.java
@@ -71,6 +71,8 @@ public abstract class AbstractMemTable implements IMemTable {
private long minPlanIndex = Long.MAX_VALUE;
+ private long createdTime = System.currentTimeMillis();
+
public AbstractMemTable() {
this.memTableMap = new HashMap<>();
}
@@ -359,4 +361,9 @@ public abstract class AbstractMemTable implements IMemTable {
maxPlanIndex = Math.max(index, maxPlanIndex);
minPlanIndex = Math.min(index, minPlanIndex);
}
+
+ @Override
+ public long getCreatedTime() {
+ return createdTime;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
index ce5de4e..98532aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/memtable/IMemTable.java
@@ -143,4 +143,6 @@ public interface IMemTable {
long getMaxPlanIndex();
long getMinPlanIndex();
+
+ long getCreatedTime();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9964d46..ab8ccdc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1421,6 +1421,92 @@ public class StorageGroupProcessor {
}
}
+ public void timedFlushSeqMemTable() {
+ writeLock("timedFlushSeqMemTable");
+ try {
+ // only check sequence tsfiles' memtables
+ List<TsFileProcessor> tsFileProcessors =
+ new ArrayList<>(workSequenceTsFileProcessors.values());
+ long timeLowerBound = System.currentTimeMillis() - config.getSeqMemtableFlushInterval();
+
+ for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
+ if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+ logger.info(
+ "Exceed sequence memtable flush interval, so flush working memtable of time partition {} in storage group {}[{}]",
+ tsFileProcessor.getTimeRangeId(),
+ logicalStorageGroupName,
+ virtualStorageGroupId);
+ fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void timedFlushUnseqMemTable() {
+ writeLock("timedFlushUnseqMemTable");
+ try {
+ // only check unsequence tsfiles' memtables
+ List<TsFileProcessor> tsFileProcessors =
+ new ArrayList<>(workUnsequenceTsFileProcessors.values());
+ long timeLowerBound = System.currentTimeMillis() - config.getUnseqMemtableFlushInterval();
+
+ for (TsFileProcessor tsFileProcessor : tsFileProcessors) {
+ if (tsFileProcessor.getWorkMemTableCreatedTime() < timeLowerBound) {
+ logger.info(
+ "Exceed unsequence memtable flush interval, so flush working memtable of time partition {} in storage group {}[{}]",
+ tsFileProcessor.getTimeRangeId(),
+ logicalStorageGroupName,
+ virtualStorageGroupId);
+ fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ public void timedCloseTsFileProcessor() {
+ writeLock("timedCloseTsFileProcessor");
+ try {
+ List<TsFileProcessor> seqTsFileProcessors =
+ new ArrayList<>(workSequenceTsFileProcessors.values());
+ long timeLowerBound =
+ System.currentTimeMillis() - config.getCloseTsFileIntervalAfterFlushing();
+ for (TsFileProcessor tsFileProcessor : seqTsFileProcessors) {
+ // working memtable is null(no more write ops) and last flush time exceeds close interval
+ if (tsFileProcessor.getWorkMemTableCreatedTime() == Long.MAX_VALUE
+ && tsFileProcessor.getLastWorkMemtableFlushTime() < timeLowerBound) {
+ logger.info(
+ "Exceed tsfile close interval, so close TsFileProcessor of time partition {} in storage group {}[{}]",
+ tsFileProcessor.getTimeRangeId(),
+ logicalStorageGroupName,
+ virtualStorageGroupId);
+ asyncCloseOneTsFileProcessor(true, tsFileProcessor);
+ }
+ }
+
+ List<TsFileProcessor> unSeqTsFileProcessors =
+ new ArrayList<>(workUnsequenceTsFileProcessors.values());
+ timeLowerBound = System.currentTimeMillis() - config.getCloseTsFileIntervalAfterFlushing();
+ for (TsFileProcessor tsFileProcessor : unSeqTsFileProcessors) {
+ // working memtable is null(no more write ops) and last flush time exceeds close interval
+ if (tsFileProcessor.getWorkMemTableCreatedTime() == Long.MAX_VALUE
+ && tsFileProcessor.getLastWorkMemtableFlushTime() < timeLowerBound) {
+ logger.info(
+ "Exceed tsfile close interval, so close TsFileProcessor of time partition {} in storage group {}[{}]",
+ tsFileProcessor.getTimeRangeId(),
+ logicalStorageGroupName,
+ virtualStorageGroupId);
+ asyncCloseOneTsFileProcessor(false, tsFileProcessor);
+ }
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
/** This method will be blocked until all tsfile processors are closed. */
public void syncCloseAllWorkingTsFileProcessors() {
synchronized (closeStorageGroupCondition) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index e35cc3a..73fcb81 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -113,6 +113,9 @@ public class TsFileProcessor {
private IMemTable workMemTable;
+ /** last flush time to flush the working memtable */
+ private long lastWorkMemtableFlushTime;
+
/** this callback is called before the workMemtable is added into the flushingMemTables. */
private final UpdateEndTimeCallBack updateLatestFlushTimeCallback;
@@ -745,6 +748,7 @@ public class TsFileProcessor {
totalMemTableSize += tobeFlushed.memSize();
}
workMemTable = null;
+ lastWorkMemtableFlushTime = System.currentTimeMillis();
FlushManager.getInstance().registerTsFileProcessor(this);
}
@@ -1217,6 +1221,15 @@ public class TsFileProcessor {
return workMemTable != null ? workMemTable.getTVListsRamCost() : 0;
}
+ /** Return Long.MAX_VALUE if workMemTable is null */
+ public long getWorkMemTableCreatedTime() {
+ return workMemTable != null ? workMemTable.getCreatedTime() : Long.MAX_VALUE;
+ }
+
+ public long getLastWorkMemtableFlushTime() {
+ return lastWorkMemtableFlushTime;
+ }
+
public boolean isSequence() {
return sequence;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
index 996b857..8ad42bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/virtualSg/VirtualStorageGroupManager.java
@@ -85,6 +85,33 @@ public class VirtualStorageGroupManager {
}
}
+ /** push check sequence memtable flush interval down to all sg */
+ public void timedFlushSeqMemTable() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.timedFlushSeqMemTable();
+ }
+ }
+ }
+
+ /** push check unsequence memtable flush interval down to all sg */
+ public void timedFlushUnseqMemTable() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.timedFlushUnseqMemTable();
+ }
+ }
+ }
+
+ /** push check TsFileProcessor close interval down to all sg */
+ public void timedCloseTsFileProcessor() {
+ for (StorageGroupProcessor storageGroupProcessor : virtualStorageGroupProcessor) {
+ if (storageGroupProcessor != null) {
+ storageGroupProcessor.timedCloseTsFileProcessor();
+ }
+ }
+ }
+
/**
* get processor from device id
*
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
index 856ffc8..461b64b 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/MemTableManager.java
@@ -102,6 +102,10 @@ public class MemTableManager {
notifyAll();
}
+ public synchronized void close() {
+ currentMemtableNumber = 0;
+ }
+
private static class InstanceHolder {
private static final MemTableManager INSTANCE = new MemTableManager();
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 20b4cf7..6493755 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -22,11 +22,14 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.constant.TestConstant;
import org.apache.iotdb.db.engine.MetadataManagerHelper;
+import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
+import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy;
import org.apache.iotdb.db.engine.merge.manage.MergeManager;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
+import org.apache.iotdb.db.exception.ShutdownException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -37,6 +40,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
@@ -50,6 +54,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@@ -57,6 +63,8 @@ import java.util.Collections;
import java.util.List;
public class StorageGroupProcessorTest {
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static Logger logger = LoggerFactory.getLogger(StorageGroupProcessorTest.class);
private String storageGroup = "root.vehicle.d0";
private String systemDir = TestConstant.OUTPUT_DATA_DIR.concat("info");
@@ -67,9 +75,7 @@ public class StorageGroupProcessorTest {
@Before
public void setUp() throws Exception {
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+ config.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
MetadataManagerHelper.initMetadata();
EnvironmentUtils.envSetUp();
processor = new DummySGP(systemDir, storageGroup);
@@ -83,9 +89,7 @@ public class StorageGroupProcessorTest {
EnvironmentUtils.cleanDir(TestConstant.OUTPUT_DATA_DIR);
MergeManager.getINSTANCE().stop();
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance()
- .getConfig()
- .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ config.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
}
private void insertToStorageGroupProcessor(TSRecord record)
@@ -296,7 +300,6 @@ public class StorageGroupProcessorTest {
@Test
public void testEnableDiscardOutOfOrderDataForInsertRowPlan()
throws WriteProcessException, QueryProcessException, IllegalPathException, IOException {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
boolean defaultValue = config.isEnableDiscardOutOfOrderData();
config.setEnableDiscardOutOfOrderData(true);
@@ -338,7 +341,6 @@ public class StorageGroupProcessorTest {
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet1()
throws QueryProcessException, IllegalPathException, IOException {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
@@ -420,7 +422,6 @@ public class StorageGroupProcessorTest {
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet2()
throws QueryProcessException, IllegalPathException, IOException {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
@@ -502,7 +503,6 @@ public class StorageGroupProcessorTest {
@Test
public void testEnableDiscardOutOfOrderDataForInsertTablet3()
throws QueryProcessException, IllegalPathException, IOException {
- IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
boolean defaultEnableDiscard = config.isEnableDiscardOutOfOrderData();
long defaultTimePartition = config.getPartitionInterval();
boolean defaultEnablePartition = config.isEnablePartition();
@@ -616,6 +616,164 @@ public class StorageGroupProcessorTest {
}
}
+ @Test
+ public void testTimedFlushSeqMemTable()
+ throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+ // create one sequence memtable
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ processor.insert(new InsertRowPlan(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // change config & reboot timed service
+ boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+ long preFLushInterval = config.getSeqMemtableFlushInterval();
+ config.setEnableTimedFlushSeqMemtable(true);
+ config.setSeqMemtableFlushInterval(5);
+ StorageEngine.getInstance().rebootTimedService();
+
+ Thread.sleep(500);
+
+ // flush the sequence memtable
+ processor.timedFlushSeqMemTable();
+
+ // wait until memtable flush task is done
+ Assert.assertEquals(1, processor.getWorkSequenceTsFileProcessors().size());
+ TsFileProcessor tsFileProcessor = processor.getWorkSequenceTsFileProcessors().iterator().next();
+ FlushManager flushManager = FlushManager.getInstance();
+ int waitCnt = 0;
+ while (tsFileProcessor.getFlushingMemTableSize() != 0
+ || tsFileProcessor.isManagedByFlushManager()
+ || flushManager.getNumberOfPendingTasks() != 0
+ || flushManager.getNumberOfPendingSubTasks() != 0
+ || flushManager.getNumberOfWorkingTasks() != 0
+ || flushManager.getNumberOfWorkingSubTasks() != 0) {
+ Thread.sleep(500);
+ ++waitCnt;
+ if (waitCnt % 10 == 0) {
+ logger.info("already wait {} s", waitCnt / 2);
+ }
+ }
+
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+ config.setSeqMemtableFlushInterval(preFLushInterval);
+ }
+
+ @Test
+ public void testTimedFlushUnseqMemTable()
+ throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+ // create one sequence memtable & close
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ processor.insert(new InsertRowPlan(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+ processor.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // create one unsequence memtable
+ record = new TSRecord(1, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ processor.insert(new InsertRowPlan(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // change config & reboot timed service
+ boolean prevEnableTimedFlushUnseqMemtable = config.isEnableTimedFlushUnseqMemtable();
+ long preFLushInterval = config.getUnseqMemtableFlushInterval();
+ config.setEnableTimedFlushUnseqMemtable(true);
+ config.setUnseqMemtableFlushInterval(5);
+ StorageEngine.getInstance().rebootTimedService();
+
+ Thread.sleep(500);
+
+ // flush the unsequence memtable
+ processor.timedFlushUnseqMemTable();
+
+ // wait until memtable flush task is done
+ Assert.assertEquals(1, processor.getWorkUnsequenceTsFileProcessors().size());
+ TsFileProcessor tsFileProcessor =
+ processor.getWorkUnsequenceTsFileProcessors().iterator().next();
+ FlushManager flushManager = FlushManager.getInstance();
+ int waitCnt = 0;
+ while (tsFileProcessor.getFlushingMemTableSize() != 0
+ || tsFileProcessor.isManagedByFlushManager()
+ || flushManager.getNumberOfPendingTasks() != 0
+ || flushManager.getNumberOfPendingSubTasks() != 0
+ || flushManager.getNumberOfWorkingTasks() != 0
+ || flushManager.getNumberOfWorkingSubTasks() != 0) {
+ Thread.sleep(500);
+ ++waitCnt;
+ if (waitCnt % 10 == 0) {
+ logger.info("already wait {} s", waitCnt / 2);
+ }
+ }
+
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ config.setEnableTimedFlushUnseqMemtable(prevEnableTimedFlushUnseqMemtable);
+ config.setUnseqMemtableFlushInterval(preFLushInterval);
+ }
+
+ @Test
+ public void testTimedCloseTsFile()
+ throws IllegalPathException, InterruptedException, WriteProcessException, ShutdownException {
+ // create one sequence memtable
+ TSRecord record = new TSRecord(10000, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
+ processor.insert(new InsertRowPlan(record));
+ Assert.assertEquals(1, MemTableManager.getInstance().getCurrentMemtableNumber());
+
+ // change config & reboot timed service
+ boolean prevEnableTimedFlushSeqMemtable = config.isEnableTimedFlushSeqMemtable();
+ long preFLushInterval = config.getSeqMemtableFlushInterval();
+ config.setEnableTimedFlushSeqMemtable(true);
+ config.setSeqMemtableFlushInterval(5);
+ boolean prevEnableTimedCloseTsFile = config.isEnableTimedCloseTsFile();
+ long prevCloseTsFileInterval = config.getCloseTsFileIntervalAfterFlushing();
+ config.setEnableTimedCloseTsFile(true);
+ config.setCloseTsFileIntervalAfterFlushing(5);
+ StorageEngine.getInstance().rebootTimedService();
+
+ Thread.sleep(500);
+
+ // flush the sequence memtable
+ processor.timedFlushSeqMemTable();
+
+ // wait until memtable flush task is done
+ Assert.assertEquals(1, processor.getWorkSequenceTsFileProcessors().size());
+ TsFileProcessor tsFileProcessor = processor.getWorkSequenceTsFileProcessors().iterator().next();
+ FlushManager flushManager = FlushManager.getInstance();
+ int waitCnt = 0;
+ while (tsFileProcessor.getFlushingMemTableSize() != 0
+ || tsFileProcessor.isManagedByFlushManager()
+ || flushManager.getNumberOfPendingTasks() != 0
+ || flushManager.getNumberOfPendingSubTasks() != 0
+ || flushManager.getNumberOfWorkingTasks() != 0
+ || flushManager.getNumberOfWorkingSubTasks() != 0) {
+ Thread.sleep(500);
+ ++waitCnt;
+ if (waitCnt % 10 == 0) {
+ logger.info("already wait {} s", waitCnt / 2);
+ }
+ }
+
+ Assert.assertEquals(0, MemTableManager.getInstance().getCurrentMemtableNumber());
+ Assert.assertFalse(tsFileProcessor.alreadyMarkedClosing());
+
+ // close the tsfile
+ processor.timedCloseTsFileProcessor();
+
+ Thread.sleep(500);
+
+ Assert.assertTrue(tsFileProcessor.alreadyMarkedClosing());
+
+ config.setEnableTimedFlushSeqMemtable(prevEnableTimedFlushSeqMemtable);
+ config.setSeqMemtableFlushInterval(preFLushInterval);
+ config.setEnableTimedCloseTsFile(prevEnableTimedCloseTsFile);
+ config.setCloseTsFileIntervalAfterFlushing(prevCloseTsFileInterval);
+ }
+
class DummySGP extends StorageGroupProcessor {
DummySGP(String systemInfoDir, String storageGroupName) throws StorageGroupProcessorException {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index c459760..f7a9310 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.control.TracingManager;
import org.apache.iotdb.db.query.udf.service.UDFRegistrationService;
+import org.apache.iotdb.db.rescon.MemTableManager;
import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IoTDB;
@@ -147,6 +148,9 @@ public class EnvironmentUtils {
// clear system info
SystemInfo.getInstance().close();
+ // clear memtable manager info
+ MemTableManager.getInstance().close();
+
// delete all directory
cleanAllDir();
config.setSeqTsFileSize(oldSeqTsFileSize);