You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/04 09:58:24 UTC
[iotdb] 01/01: finish sliding window version 1
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5207bace0f75bc1beccb33994d5b9a6c7a81d464
Author: EJTTianyu <16...@qq.com>
AuthorDate: Fri Dec 4 17:57:47 2020 +0800
finish sliding window version 1
---
.../apache/iotdb/tsfile/TsFileSequenceRead.java | 2 +-
.../resources/conf/iotdb-engine.properties | 13 +++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 39 ++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 11 +++
.../apache/iotdb/db/engine/flush/FlushManager.java | 21 +++++
.../engine/storagegroup/StorageGroupProcessor.java | 101 ++++++++++++++++-----
.../db/engine/storagegroup/TsFileProcessor.java | 73 ++++++++++++---
.../iotdb/db/qp/physical/crud/InsertRowPlan.java | 11 +++
.../org/apache/iotdb/db/rescon/SystemInfo.java | 5 +
.../storagegroup/StorageGroupProcessorTest.java | 25 +++++
10 files changed, 263 insertions(+), 38 deletions(-)
diff --git a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
index da5bae6..4cbf862 100644
--- a/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
+++ b/example/tsfile/src/main/java/org/apache/iotdb/tsfile/TsFileSequenceRead.java
@@ -41,7 +41,7 @@ public class TsFileSequenceRead {
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static void main(String[] args) throws IOException {
- String filename = "test.tsfile";
+ String filename = "/Users/tianyu/2019秋季学期/iotdb/server/target/data/sequence/root.vehicle.d0/0/1607075644401-1-0.tsfile";
if (args.length >= 1) {
filename = args[0];
}
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index b0a691b..1f296ef 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -232,6 +232,9 @@ primitive_array_size=128
# Ratio of write memory for invoking flush disk, 0.3 by default
flush_proportion=0.3
+# Ratio of write memory for invoking flush immediately, 0.6 by default
+force_flush_proportion=0.6
+
# Ratio of write memory allocated for buffered arrays, 0.6 by default
buffered_arrays_memory_proportion=0.6
@@ -255,6 +258,16 @@ max_waiting_time_when_insert_blocked=10000
estimated_series_size=300
####################
+### Sliding Memory Table Configurations
+####################
+
+# Whether to enable sliding memory table
+enable_sliding_mem_table=true
+
+# Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+flush_wait_time=60000
+
+####################
### Upgrade Configurations
####################
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 72718e2..a337152 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -152,6 +152,11 @@ public class IoTDBConfig {
private double flushProportion = 0.3;
/**
+ * Force flush proportion for system
+ */
+ private double forceFlushProportion = 0.6;
+
+ /**
* Reject proportion for system
*/
private double rejectProportion = 0.8;
@@ -205,6 +210,16 @@ public class IoTDBConfig {
private int estimatedSeriesSize = 300;
/**
+ * Whether to enable sliding memory table
+ */
+ private boolean enableSlidingMemTable = true;
+
+ /**
+ * Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
+ */
+ private int flushWaitTime = 60000;
+
+ /**
* default base dir, stores all IoTDB runtime files
*/
private static final String DEFAULT_BASE_DIR = "data";
@@ -989,6 +1004,22 @@ public class IoTDBConfig {
this.forceWalPeriodInMs = forceWalPeriodInMs;
}
+ public boolean isEnableSlidingMemTable() {
+ return enableSlidingMemTable;
+ }
+
+ public void setEnableSlidingMemTable(boolean enableSlidingMemTable) {
+ this.enableSlidingMemTable = enableSlidingMemTable;
+ }
+
+ public int getFlushWaitTime() {
+ return flushWaitTime;
+ }
+
+ public void setFlushWaitTime(int flushWaitTime) {
+ this.flushWaitTime = flushWaitTime;
+ }
+
public String getSystemDir() {
return systemDir;
}
@@ -1261,6 +1292,14 @@ public class IoTDBConfig {
this.flushProportion = flushProportion;
}
+ public double getForceFlushProportion() {
+ return forceFlushProportion;
+ }
+
+ public void setForceFlushProportion(double forceFlushProportion) {
+ this.forceFlushProportion = forceFlushProportion;
+ }
+
public double getRejectProportion() {
return rejectProportion;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index d7a7058..ac2c686 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -205,6 +205,9 @@ public class IoTDBDescriptor {
conf.setFlushProportion(Double.parseDouble(properties.getProperty("flush_proportion",
Double.toString(conf.getFlushProportion()))));
+ conf.setForceFlushProportion(Double.parseDouble(properties.getProperty("force_flush_proportion",
+ Double.toString(conf.getForceFlushProportion()))));
+
conf.setRejectProportion(Double.parseDouble(properties.getProperty("reject_proportion",
Double.toString(conf.getRejectProportion()))));
@@ -295,6 +298,14 @@ public class IoTDBDescriptor {
.getProperty("estimated_series_size",
Integer.toString(conf.getEstimatedSeriesSize()))));
+ conf.setEnableSlidingMemTable(Boolean.parseBoolean(properties
+ .getProperty("enable_sliding_mem_table",
+ Boolean.toString(conf.isEnableSlidingMemTable()))));
+
+ conf.setFlushWaitTime(Integer.parseInt(properties
+ .getProperty("flush_wait_time",
+ Integer.toString(conf.getFlushWaitTime()))));
+
conf.setMergeChunkPointNumberThreshold(Integer.parseInt(properties
.getProperty("merge_chunk_point_number",
Integer.toString(conf.getMergeChunkPointNumberThreshold()))));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 0d49f2c..b102d03 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -19,11 +19,14 @@
package org.apache.iotdb.db.engine.flush;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.TimeUnit;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.flush.pool.FlushSubTaskPoolManager;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileProcessor;
import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.rescon.SystemInfo;
import org.apache.iotdb.db.service.IService;
import org.apache.iotdb.db.service.JMXService;
import org.apache.iotdb.db.service.ServiceType;
@@ -86,6 +89,24 @@ public class FlushManager implements FlushManagerMBean, IService {
@Override
public void runMayThrow() {
TsFileProcessor tsFileProcessor = tsFileProcessorQueue.poll();
+ if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
+ .isEnableSlidingMemTable() && !SystemInfo.getInstance().forceFlush()) {
+ long startTime = System.currentTimeMillis();
+ while (!tsFileProcessor.isShouldClose()
+ && System.currentTimeMillis() - startTime < IoTDBDescriptor.getInstance().getConfig()
+ .getFlushWaitTime()) {
+ // wait
+ try {
+ TimeUnit.MILLISECONDS
+ .sleep(IoTDBDescriptor.getInstance().getConfig().getWaitingTimeWhenInsertBlocked());
+ } catch (InterruptedException e) {
+ logger.error("flush mem table wait error", e);
+ }
+ }
+ }
+ tsFileProcessor.setFlushingMemTable(null);
+ tsFileProcessor.setFlushMemTableAlive(false);
+ tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
tsFileProcessor.flushOneMemTable();
tsFileProcessor.setManagedByFlushManager(false);
if (logger.isDebugEnabled()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2b49583..b431823 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -148,6 +148,11 @@ public class StorageGroupProcessor {
*/
private final ReadWriteLock insertLock = new ReentrantReadWriteLock();
/**
+ * a read write lock for guaranteeing concurrent safety when latestTimeForEachDevice is updated
+ * using flushingLatestTimeForEachDevice
+ */
+ private final ReadWriteLock flushTimeUpdateLock = new ReentrantReadWriteLock();
+ /**
* closeStorageGroupCondition is used to wait for all currently closing TsFiles to be done.
*/
private final Object closeStorageGroupCondition = new Object();
@@ -177,10 +182,15 @@ public class StorageGroupProcessor {
private CopyOnReadLinkedList<TsFileProcessor> closingUnSequenceTsFileProcessor = new CopyOnReadLinkedList<>();
/*
+ * time partition id -> map, when a memory table is marked as to be flush, use latestTimeForEachDevice
+ * to cache the flushing time bound, and is used to update partitionLatestFlushedTimeForEachDevice
+ * when a flush is actually issued.
+ */
+ private Map<Long, Map<String, Long>> flushingLatestTimeForEachDevice = new HashMap<>();
+ /*
* time partition id -> map, which contains
* device -> global latest timestamp of each device latestTimeForEachDevice caches non-flushed
- * changes upon timestamps of each device, and is used to update partitionLatestFlushedTimeForEachDevice
- * when a flush is issued.
+ * changes upon timestamps of each device
*/
private Map<Long, Map<String, Long>> latestTimeForEachDevice = new HashMap<>();
/**
@@ -653,6 +663,7 @@ public class StorageGroupProcessor {
throw new OutOfTTLException(insertRowPlan.getTime(), (System.currentTimeMillis() - dataTTL));
}
writeLock();
+ flushUpdateLock();
try {
// init map
long timePartitionId = StorageEngine.getTimePartition(insertRowPlan.getTime());
@@ -676,6 +687,7 @@ public class StorageGroupProcessor {
} finally {
writeUnlock();
+ flushUpdateUnLock();
}
}
@@ -869,6 +881,10 @@ public class StorageGroupProcessor {
return;
}
+ if (sequence && config.isEnableSlidingMemTable()) {
+ insertRowPlan
+ .setToFlushingMemTable(isInsertToFlushingMemTable(timePartitionId, insertRowPlan));
+ }
// insert TsFileProcessor
tsFileProcessor.insert(insertRowPlan);
@@ -891,6 +907,17 @@ public class StorageGroupProcessor {
}
}
+ /**
+ * judge whether a insert plan should be inserted into the flushingMemtable
+ */
+ private boolean isInsertToFlushingMemTable(long timePartitionId, InsertRowPlan insertRowPlan){
+ if (!flushingLatestTimeForEachDevice.containsKey(timePartitionId)){
+ return false;
+ }
+ return flushingLatestTimeForEachDevice.get(timePartitionId).
+ getOrDefault(insertRowPlan.getDeviceId().getFullPath(), Long.MIN_VALUE) > insertRowPlan.getTime();
+ }
+
private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTime) {
if (!IoTDBDescriptor.getInstance().getConfig().isLastCacheEnabled()) {
return;
@@ -916,7 +943,7 @@ public class StorageGroupProcessor {
public void asyncFlushMemTableInTsFileProcessor(TsFileProcessor tsFileProcessor) {
writeLock();
try {
- if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
+ if (!closingSequenceTsFileProcessor.contains(tsFileProcessor) &&
!closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
fileFlushPolicy.apply(this, tsFileProcessor, tsFileProcessor.isSequence());
}
@@ -1092,7 +1119,7 @@ public class StorageGroupProcessor {
public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
- if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
+ if (closingSequenceTsFileProcessor.contains(tsFileProcessor) ||
closingUnSequenceTsFileProcessor.contains(tsFileProcessor)) {
return;
}
@@ -1372,6 +1399,17 @@ public class StorageGroupProcessor {
insertLock.writeLock().unlock();
}
+ public void flushUpdateLock(){
+ if (config.isEnableSlidingMemTable()) {
+ flushTimeUpdateLock.writeLock().lock();
+ }
+ }
+
+ public void flushUpdateUnLock(){
+ if (config.isEnableSlidingMemTable()) {
+ flushTimeUpdateLock.writeLock().unlock();
+ }
+ }
/**
* @param tsFileResources includes sealed and unsealed tsfile resources
@@ -1616,27 +1654,44 @@ public class StorageGroupProcessor {
}
private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
- // update the largest timestamp in the last flushing memtable
- Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
- .get(processor.getTimeRangeId());
-
- if (curPartitionDeviceLatestTime == null) {
- logger.warn("Partition: {} does't have latest time for each device. "
- + "No valid record is written into memtable. Flushing tsfile is: {}",
- processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
- return false;
- }
+ flushUpdateLock();
+ try {
+ // update the largest timestamp in the last flushing memtable
+ Map<String, Long> curPartitionDeviceLatestTime = latestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+
+ if (curPartitionDeviceLatestTime == null) {
+ logger.warn("Partition: {} does't have latest time for each device. "
+ + "No valid record is written into memtable. Flushing tsfile is: {}",
+ processor.getTimeRangeId(), processor.getTsFileResource().getTsFile());
+ return false;
+ }
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
- entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ if (processor.isFlushMemTableAlive()) {
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ flushingLatestTimeForEachDevice
+ .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ }
+ } else {
+ if (processor.isSequence()) {
+ curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+ }
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice
+ .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ }
}
+ } finally {
+ flushUpdateUnLock();
}
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index b810774..32dec07 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -101,13 +100,18 @@ public class TsFileProcessor {
*/
private volatile boolean managedByFlushManager;
private final ReadWriteLock flushQueryLock = new ReentrantReadWriteLock();
+
/**
* It is set by the StorageGroupProcessor and checked by flush threads. (If shouldClose == true
* and its flushingMemTables are all flushed, then the flush thread will close this file.)
*/
private volatile boolean shouldClose;
+
+ private IMemTable flushingMemTable;
private IMemTable workMemTable;
+ private boolean isFlushMemTableAlive = false;
+
private final VersionController versionController;
/**
@@ -180,8 +184,11 @@ public class TsFileProcessor {
blockInsertionIfReject();
checkMemCostAndAddToTspInfo(insertRowPlan);
}
-
- workMemTable.insert(insertRowPlan);
+ if (isFlushMemTableAlive && insertRowPlan.isToFlushingMemTable()){
+ flushingMemTable.insert(insertRowPlan);
+ } else {
+ workMemTable.insert(insertRowPlan);
+ }
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
try {
getLogNode().write(insertRowPlan);
@@ -258,7 +265,7 @@ public class TsFileProcessor {
long textDataIncrement = 0L;
long chunkMetadataIncrement = 0L;
String deviceId = insertRowPlan.getDeviceId().getFullPath();
- long unsealedResourceIncrement =
+ long unsealedResourceIncrement =
tsFileResource.estimateRamIncrement(deviceId);
for (int i = 0; i < insertRowPlan.getDataTypes().length; i++) {
// skip failed Measurements
@@ -496,11 +503,12 @@ public class TsFileProcessor {
try {
if (logger.isInfoEnabled()) {
- if (workMemTable != null) {
+ if (flushingMemTable != null || workMemTable != null) {
logger.info(
- "{}: flush a working memtable in async close tsfile {}, memtable size: {}, tsfile size: {}",
+ "{}: flush a memtable in async close tsfile {}, flushing memtable size: {}, "
+ + "working memtable size: {}, tsfile size: {}",
storageGroupName, tsFileResource.getTsFile().getAbsolutePath(),
- workMemTable.memSize(),
+ flushingMemTable == null ? 0 : flushingMemTable.memSize(), workMemTable.memSize(),
tsFileResource.getTsFileSize());
} else {
logger.info("{}: flush a NotifyFlushMemTable in async close tsfile {}, tsfile size: {}",
@@ -522,14 +530,18 @@ public class TsFileProcessor {
// we have to add the memtable into flushingList first and then set the shouldClose tag.
// see https://issues.apache.org/jira/browse/IOTDB-510
- IMemTable tmpMemTable = workMemTable == null || workMemTable.memSize() == 0
- ? new NotifyFlushMemTable()
+// IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
+// ? new NotifyFlushMemTable()
+// : flushingMemTable;
+ IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
+ ? new NotifyFlushMemTable()
: workMemTable;
try {
// When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
// flushing memTable in System module.
- addAMemtableIntoFlushingList(tmpMemTable);
+// addAMemtableIntoFlushingList(tmpFlushMemTable);
+ addAMemtableIntoFlushingList(tmpWorkMemTable);
shouldClose = true;
tsFileResource.setCloseFlag();
} catch (Exception e) {
@@ -557,7 +569,11 @@ public class TsFileProcessor {
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+ if (flushingMemTable == null) {
+ tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
+ } else {
+ tmpMemTable = flushingMemTable;
+ }
if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
storageGroupName, tsFileResource.getTsFile().getName());
@@ -606,6 +622,11 @@ public class TsFileProcessor {
}
logger.info("Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
+ if (config.isEnableSlidingMemTable()){
+ while (flushingMemTable != null) {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ }
+ }
addAMemtableIntoFlushingList(workMemTable);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
@@ -625,8 +646,7 @@ public class TsFileProcessor {
* flushManager again.
*/
private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
- if (!tobeFlushed.isSignalMemTable() &&
- (!updateLatestFlushTimeCallback.call(this) || tobeFlushed.memSize() == 0)) {
+ if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
return;
@@ -649,12 +669,16 @@ public class TsFileProcessor {
if (!tobeFlushed.isSignalMemTable()) {
totalMemTableSize += tobeFlushed.memSize();
}
+ if (sequence && config.isEnableSlidingMemTable()) {
+ flushingMemTable = workMemTable;
+ isFlushMemTableAlive = true;
+ }
+ updateLatestFlushTimeCallback.call(this);
workMemTable = null;
shouldFlush = false;
FlushManager.getInstance().registerTsFileProcessor(this);
}
-
/**
* put back the memtable to MemTablePool and make metadata in writer visible
*/
@@ -1004,4 +1028,25 @@ public class TsFileProcessor {
public void addCloseFileListeners(Collection<CloseFileListener> listeners) {
closeFileListeners.addAll(listeners);
}
+
+ public void setFlushingMemTable(IMemTable flushingMemTable) {
+ this.flushingMemTable = flushingMemTable;
+ }
+
+
+ public UpdateEndTimeCallBack getUpdateLatestFlushTimeCallback() {
+ return updateLatestFlushTimeCallback;
+ }
+
+ public boolean isFlushMemTableAlive() {
+ return isFlushMemTableAlive;
+ }
+
+ public void setFlushMemTableAlive(boolean flushMemTableAlive) {
+ isFlushMemTableAlive = flushMemTableAlive;
+ }
+
+ public boolean isShouldClose() {
+ return shouldClose;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
index b41311e..65ee5fb 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/InsertRowPlan.java
@@ -61,6 +61,9 @@ public class InsertRowPlan extends InsertPlan {
private List<Object> failedValues;
+ // judge whether a insert should use flushingMemTable or not
+ private boolean toFlushingMemTable = false;
+
public InsertRowPlan() {
super(OperatorType.INSERT);
}
@@ -464,4 +467,12 @@ public class InsertRowPlan extends InsertPlan {
failedValues = null;
return this;
}
+
+ public boolean isToFlushingMemTable() {
+ return toFlushingMemTable;
+ }
+
+ public void setToFlushingMemTable(boolean toFlushingMemTable) {
+ this.toFlushingMemTable = toFlushingMemTable;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
index 74c00f0..c7eacff 100644
--- a/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/rescon/SystemInfo.java
@@ -45,6 +45,7 @@ public class SystemInfo {
private Map<StorageGroupInfo, Long> reportedSgMemCostMap = new ConcurrentHashMap<>();
private static final double FLUSH_PROPORTION = config.getFlushProportion();
+ private static final double FORCE_FLUSH_PROPORTION = config.getForceFlushProportion();
private static final double REJECT_PROPORTION = config.getRejectProportion();
/**
@@ -202,4 +203,8 @@ public class SystemInfo {
private static SystemInfo instance = new SystemInfo();
}
+
+ public boolean forceFlush(){
+ return totalSgMemCost.get() >= config.getAllocateMemoryForWrite() * FORCE_FLUSH_PROPORTION;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 6e0efbc..87bf8c1 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -137,6 +137,31 @@ public class StorageGroupProcessorTest {
}
@Test
+ public void testSlidingMemTable()
+ throws WriteProcessException, IOException, MetadataException {
+ TSRecord record;
+
+ for (int j = 5; j <= 10; j++) {
+ record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+
+ for (TsFileProcessor tsfileProcessor : processor.getWorkSequenceTsFileProcessors()) {
+ tsfileProcessor.asyncFlush();
+ }
+
+ for (int j = 1; j <= 3; j++) {
+ record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ processor.insert(new InsertRowPlan(record));
+ }
+// System.exit(0);
+ processor.syncCloseAllWorkingTsFileProcessors();
+
+ }
+
+ @Test
public void testSequenceSyncClose()
throws WriteProcessException, QueryProcessException, IllegalPathException {
for (int j = 1; j <= 10; j++) {