You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/11 03:45:12 UTC
[iotdb] 02/06: finish sliding window version 2
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 17a465612befe912648ce699ab57ff7aa1fd5483
Author: EJTTianyu <16...@qq.com>
AuthorDate: Mon Dec 7 15:38:44 2020 +0800
finish sliding window version 2
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +--
.../engine/storagegroup/StorageGroupProcessor.java | 35 +++++++++++--------
.../db/engine/storagegroup/TsFileProcessor.java | 40 +++++++++++++---------
3 files changed, 45 insertions(+), 34 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 33546f0..c5ad200 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -216,9 +216,9 @@ public class IoTDBConfig {
private int estimatedSeriesSize = 300;
/**
- * Whether to enable sliding memory table
+ * Whether to enable sliding memory table, to reduce the CI time, the default value is false
*/
- private boolean enableSlidingMemTable = true;
+ private boolean enableSlidingMemTable = false;
/**
* Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 32c3ac7..b670184 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1699,22 +1699,13 @@ public class StorageGroupProcessor {
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- } else {
- if (processor.isSequence()) {
- curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
- .get(processor.getTimeRangeId());
- }
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
- entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
+ if (processor.isUpdateLatestTime()) {
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
+ } else {
+ curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
} finally {
flushUpdateUnLock();
@@ -1722,6 +1713,20 @@ public class StorageGroupProcessor {
return true;
}
+ private void updateLatestTime(TsFileProcessor processor, Map<String, Long> curPartitionDeviceLatestTime){
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice
+ .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
/**
* used for upgrading
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 3cd1fbe..0d70d54 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -109,6 +109,7 @@ public class TsFileProcessor {
private IMemTable workMemTable;
private boolean isFlushMemTableAlive = false;
+ private boolean updateLatestTime = false;
private final VersionController versionController;
@@ -521,9 +522,6 @@ public class TsFileProcessor {
// we have to add the memtable into flushingList first and then set the shouldClose tag.
// see https://issues.apache.org/jira/browse/IOTDB-510
-// IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
-// ? new NotifyFlushMemTable()
-// : flushingMemTable;
IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
? new NotifyFlushMemTable()
: workMemTable;
@@ -531,7 +529,7 @@ public class TsFileProcessor {
try {
// When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
// flushing memTable in System module.
- addAMemtableIntoFlushingList(tmpWorkMemTable);
+ addAMemtableIntoFlushingList(tmpWorkMemTable, true);
logger.info("Memtable {} has been added to flushing list", tmpWorkMemTable);
shouldClose = true;
} catch (Exception e) {
@@ -559,16 +557,12 @@ public class TsFileProcessor {
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
- if (flushingMemTable == null) {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
- } else {
- tmpMemTable = flushingMemTable;
- }
+ tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
storageGroupName, tsFileResource.getTsFile().getName());
}
- addAMemtableIntoFlushingList(tmpMemTable);
+ addAMemtableIntoFlushingList(tmpMemTable, false);
} finally {
flushQueryLock.writeLock().unlock();
if (logger.isDebugEnabled()) {
@@ -601,6 +595,15 @@ public class TsFileProcessor {
* put the working memtable into flushing list and set the working memtable to null
*/
public void asyncFlush() {
+ if (config.isEnableSlidingMemTable()){
+ while (isManagedByFlushManager() && flushingMemTable != null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger
@@ -612,12 +615,8 @@ public class TsFileProcessor {
}
logger.info("Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
- if (config.isEnableSlidingMemTable()){
- while (flushingMemTable != null) {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- }
- }
- addAMemtableIntoFlushingList(workMemTable);
+
+ addAMemtableIntoFlushingList(workMemTable, false);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
tsFileResource.getTsFile().getName(), e);
@@ -635,7 +634,7 @@ public class TsFileProcessor {
* queue, set the current working memtable as null and then register the tsfileProcessor into the
* flushManager again.
*/
- private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+ private void addAMemtableIntoFlushingList(IMemTable tobeFlushed, boolean isClosing) throws IOException {
if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
@@ -663,6 +662,9 @@ public class TsFileProcessor {
flushingMemTable = workMemTable;
isFlushMemTableAlive = true;
}
+ if(isClosing){
+ updateLatestTime = true;
+ }
updateLatestFlushTimeCallback.call(this);
workMemTable = null;
shouldFlush = false;
@@ -1049,4 +1051,8 @@ public class TsFileProcessor {
public boolean isShouldClose() {
return shouldClose;
}
+
+ public boolean isUpdateLatestTime() {
+ return updateLatestTime;
+ }
}