You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/11 03:45:15 UTC
[iotdb] 05/06: finish sliding mem table version 5
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 066a3a749b16440fe68df9d122180acae5c4859f
Author: EJTTianyu <16...@qq.com>
AuthorDate: Wed Dec 9 18:56:43 2020 +0800
finish sliding mem table version 5
---
.../org/apache/iotdb/db/engine/flush/FlushManager.java | 6 ++----
.../db/engine/storagegroup/StorageGroupProcessor.java | 14 ++++++++------
.../iotdb/db/engine/storagegroup/TsFileProcessor.java | 11 ++++-------
.../db/engine/storagegroup/StorageGroupProcessorTest.java | 12 ++++++++----
.../iotdb/db/engine/storagegroup/TsFileProcessorTest.java | 8 ++++----
5 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 2a40384..b4041d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -108,10 +108,8 @@ public class FlushManager implements FlushManagerMBean, IService {
}
}
if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
- .isEnableSlidingMemTable()) {
- tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
- tsFileProcessor.setFlushingMemTable(null);
- tsFileProcessor.setFlushMemTableAlive(false);
+ .isEnableSlidingMemTable() && tsFileProcessor.isFlushMemTableAlive()) {
+ tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor, true);
}
tsFileProcessor.flushOneMemTable();
tsFileProcessor.setManagedByFlushManager(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index b3b1f91..7025017 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1676,11 +1676,11 @@ public class StorageGroupProcessor {
}
}
- private boolean unsequenceFlushCallback(TsFileProcessor processor) {
+ private boolean unsequenceFlushCallback(TsFileProcessor processor, boolean updateLatest) {
return true;
}
- private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
+ private boolean updateLatestFlushTimeCallback(TsFileProcessor processor, boolean updateLatest) {
flushUpdateLock();
try {
// update the largest timestamp in the last flushing memtable
@@ -1694,14 +1694,14 @@ public class StorageGroupProcessor {
return false;
}
- if (processor.isFlushMemTableAlive()) {
- // if flushing mem table is alive, use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
+ if (config.isEnableSlidingMemTable() && !updateLatest) {
+ // use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
flushingLatestTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- // when the processor is closing, update partitionLatestFlushedTimeForEachDevice
+ // when the processor is closing, update partitionLatestFlushedTimeForEachDevice immediately
if (processor.isUpdateLatestTime()) {
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
@@ -1710,6 +1710,8 @@ public class StorageGroupProcessor {
if (config.isEnableSlidingMemTable()) {
curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
.get(processor.getTimeRangeId());
+ processor.setFlushingMemTable(null);
+ processor.setFlushMemTableAlive(false);
}
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
@@ -2574,7 +2576,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface UpdateEndTimeCallBack {
- boolean call(TsFileProcessor caller);
+ boolean call(TsFileProcessor caller, boolean updateLatest);
}
@FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index c09d660..547d0e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -596,12 +596,9 @@ public class TsFileProcessor {
*/
public void asyncFlush() {
if (config.isEnableSlidingMemTable()){
- while (flushingMemTable != null) {
- try {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- } catch (InterruptedException e) {
- logger.error("async flush failed because the flushing mem table is still alive", e);
- }
+ if (flushingMemTable != null) {
+ // if previous flushing mem table is still alive, update partitionLatestFlushedTimeForEachDevice
+ updateLatestFlushTimeCallback.call(this, true);
}
}
flushQueryLock.writeLock().lock();
@@ -665,7 +662,7 @@ public class TsFileProcessor {
if(isClosing){
updateLatestTime = true;
}
- updateLatestFlushTimeCallback.call(this);
+ updateLatestFlushTimeCallback.call(this, false);
workMemTable = null;
shouldFlush = false;
FlushManager.getInstance().registerTsFileProcessor(this);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 707b76f..d786d1f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -139,9 +139,10 @@ public class StorageGroupProcessorTest {
@Test
public void testSlidingMemTable()
- throws WriteProcessException, IOException, MetadataException {
- TSRecord record;
+ throws WriteProcessException, QueryProcessException, MetadataException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(true);
+ TSRecord record;
for (int j = 5; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -157,9 +158,12 @@ public class StorageGroupProcessorTest {
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
}
-// System.exit(0);
processor.syncCloseAllWorkingTsFileProcessors();
-
+ QueryDataSource queryDataSource = processor
+ .query(new PartialPath(deviceId), measurementId, context,
+ null, null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(false);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index 9ba80bb..e73c135 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -87,7 +87,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndFlush begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -143,7 +143,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -226,7 +226,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -267,7 +267,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);