You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/07 07:39:15 UTC
[iotdb] branch dev_sliding_mem_table updated: finish sliding window
version 2
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev_sliding_mem_table by this push:
new 4ed7dd8 finish sliding window version 2
4ed7dd8 is described below
commit 4ed7dd87b4dbddd2b3380956f9bbdb86ffdb62a1
Author: EJTTianyu <16...@qq.com>
AuthorDate: Mon Dec 7 15:38:44 2020 +0800
finish sliding window version 2
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 4 +--
.../engine/storagegroup/StorageGroupProcessor.java | 35 ++++++++++--------
.../db/engine/storagegroup/TsFileProcessor.java | 41 ++++++++++++----------
3 files changed, 45 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a337152..13e9d3b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -210,9 +210,9 @@ public class IoTDBConfig {
private int estimatedSeriesSize = 300;
/**
- * Whether to enable sliding memory table
+ * Whether to enable sliding memory table, to reduce the CI time, the default value is false
*/
- private boolean enableSlidingMemTable = true;
+ private boolean enableSlidingMemTable = false;
/**
* Save the flushing memtable in the memory during the period, can help reduce the unseq ratio, Unit: millis.
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b431823..f3896a6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1673,22 +1673,13 @@ public class StorageGroupProcessor {
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- } else {
- if (processor.isSequence()) {
- curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
- .get(processor.getTimeRangeId());
- }
- for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
- .put(entry.getKey(), entry.getValue());
- updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
- entry.getKey(), entry.getValue());
- if (globalLatestFlushedTimeForEachDevice
- .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
- globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
- }
+ if (processor.isUpdateLatestTime()) {
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
+ } else {
+ curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
+ .get(processor.getTimeRangeId());
+ updateLatestTime(processor, curPartitionDeviceLatestTime);
}
} finally {
flushUpdateUnLock();
@@ -1696,6 +1687,20 @@ public class StorageGroupProcessor {
return true;
}
+ private void updateLatestTime(TsFileProcessor processor, Map<String, Long> curPartitionDeviceLatestTime){
+ for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
+ .put(entry.getKey(), entry.getValue());
+ updateNewlyFlushedPartitionLatestFlushedTimeForEachDevice(processor.getTimeRangeId(),
+ entry.getKey(), entry.getValue());
+ if (globalLatestFlushedTimeForEachDevice
+ .getOrDefault(entry.getKey(), Long.MIN_VALUE) < entry.getValue()) {
+ globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
/**
* used for upgrading
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 32dec07..a4d0a0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -111,6 +111,7 @@ public class TsFileProcessor {
private IMemTable workMemTable;
private boolean isFlushMemTableAlive = false;
+ private boolean updateLatestTime = false;
private final VersionController versionController;
@@ -530,9 +531,6 @@ public class TsFileProcessor {
// we have to add the memtable into flushingList first and then set the shouldClose tag.
// see https://issues.apache.org/jira/browse/IOTDB-510
-// IMemTable tmpFlushMemTable = flushingMemTable == null || flushingMemTable.memSize() == 0
-// ? new NotifyFlushMemTable()
-// : flushingMemTable;
IMemTable tmpWorkMemTable = workMemTable == null || workMemTable.memSize() == 0
? new NotifyFlushMemTable()
: workMemTable;
@@ -540,8 +538,7 @@ public class TsFileProcessor {
try {
// When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
// flushing memTable in System module.
-// addAMemtableIntoFlushingList(tmpFlushMemTable);
- addAMemtableIntoFlushingList(tmpWorkMemTable);
+ addAMemtableIntoFlushingList(tmpWorkMemTable, true);
shouldClose = true;
tsFileResource.setCloseFlag();
} catch (Exception e) {
@@ -569,16 +566,12 @@ public class TsFileProcessor {
.debug(FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
- if (flushingMemTable == null) {
- tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
- } else {
- tmpMemTable = flushingMemTable;
- }
+ tmpMemTable = workMemTable == null ? new NotifyFlushMemTable() : workMemTable;
if (logger.isDebugEnabled() && tmpMemTable.isSignalMemTable()) {
logger.debug("{}: {} add a signal memtable into flushing memtable list when sync flush",
storageGroupName, tsFileResource.getTsFile().getName());
}
- addAMemtableIntoFlushingList(tmpMemTable);
+ addAMemtableIntoFlushingList(tmpMemTable, false);
} finally {
flushQueryLock.writeLock().unlock();
if (logger.isDebugEnabled()) {
@@ -611,6 +604,15 @@ public class TsFileProcessor {
* put the working memtable into flushing list and set the working memtable to null
*/
public void asyncFlush() {
+ if (config.isEnableSlidingMemTable()){
+ while (isManagedByFlushManager() && flushingMemTable != null) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
flushQueryLock.writeLock().lock();
if (logger.isDebugEnabled()) {
logger
@@ -622,12 +624,8 @@ public class TsFileProcessor {
}
logger.info("Async flush a memtable to tsfile: {}",
tsFileResource.getTsFile().getAbsolutePath());
- if (config.isEnableSlidingMemTable()){
- while (flushingMemTable != null) {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- }
- }
- addAMemtableIntoFlushingList(workMemTable);
+
+ addAMemtableIntoFlushingList(workMemTable, false);
} catch (Exception e) {
logger.error("{}: {} add a memtable into flushing list failed", storageGroupName,
tsFileResource.getTsFile().getName(), e);
@@ -645,7 +643,7 @@ public class TsFileProcessor {
* queue, set the current working memtable as null and then register the tsfileProcessor into the
* flushManager again.
*/
- private void addAMemtableIntoFlushingList(IMemTable tobeFlushed) throws IOException {
+ private void addAMemtableIntoFlushingList(IMemTable tobeFlushed, boolean isClosing) throws IOException {
if (!tobeFlushed.isSignalMemTable() && tobeFlushed.memSize() == 0) {
logger.warn("This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
storageGroupName, tsFileResource.getTsFile().getName(), tobeFlushed.getMemTableMap());
@@ -673,6 +671,9 @@ public class TsFileProcessor {
flushingMemTable = workMemTable;
isFlushMemTableAlive = true;
}
+ if(isClosing){
+ updateLatestTime = true;
+ }
updateLatestFlushTimeCallback.call(this);
workMemTable = null;
shouldFlush = false;
@@ -1049,4 +1050,8 @@ public class TsFileProcessor {
public boolean isShouldClose() {
return shouldClose;
}
+
+ public boolean isUpdateLatestTime() {
+ return updateLatestTime;
+ }
}