You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/12/09 10:57:15 UTC
[iotdb] branch dev_sliding_mem_table updated: finish sliding mem
table version 5
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch dev_sliding_mem_table
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev_sliding_mem_table by this push:
new 437953d finish sliding mem table version 5
437953d is described below
commit 437953d433ab52155f0bc935e49b9a58daba19b9
Author: EJTTianyu <16...@qq.com>
AuthorDate: Wed Dec 9 18:56:43 2020 +0800
finish sliding mem table version 5
---
.../org/apache/iotdb/db/engine/flush/FlushManager.java | 6 ++----
.../db/engine/storagegroup/StorageGroupProcessor.java | 14 ++++++++------
.../iotdb/db/engine/storagegroup/TsFileProcessor.java | 11 ++++-------
.../db/engine/storagegroup/StorageGroupProcessorTest.java | 12 ++++++++----
.../iotdb/db/engine/storagegroup/TsFileProcessorTest.java | 8 ++++----
5 files changed, 26 insertions(+), 25 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
index 2672f47..14389a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/FlushManager.java
@@ -105,10 +105,8 @@ public class FlushManager implements FlushManagerMBean, IService {
}
}
if (tsFileProcessor.isSequence() && IoTDBDescriptor.getInstance().getConfig()
- .isEnableSlidingMemTable()) {
- tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor);
- tsFileProcessor.setFlushingMemTable(null);
- tsFileProcessor.setFlushMemTableAlive(false);
+ .isEnableSlidingMemTable() && tsFileProcessor.isFlushMemTableAlive()) {
+ tsFileProcessor.getUpdateLatestFlushTimeCallback().call(tsFileProcessor, true);
}
tsFileProcessor.flushOneMemTable();
tsFileProcessor.setManagedByFlushManager(false);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index f717637..d094ad9 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1650,11 +1650,11 @@ public class StorageGroupProcessor {
}
}
- private boolean unsequenceFlushCallback(TsFileProcessor processor) {
+ private boolean unsequenceFlushCallback(TsFileProcessor processor, boolean updateLatest) {
return true;
}
- private boolean updateLatestFlushTimeCallback(TsFileProcessor processor) {
+ private boolean updateLatestFlushTimeCallback(TsFileProcessor processor, boolean updateLatest) {
flushUpdateLock();
try {
// update the largest timestamp in the last flushing memtable
@@ -1668,14 +1668,14 @@ public class StorageGroupProcessor {
return false;
}
- if (processor.isFlushMemTableAlive()) {
- // if flushing mem table is alive, use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
+ if (config.isEnableSlidingMemTable() && !updateLatest) {
+ // use latestTimeForEachDevice to update flushingLatestTimeForEachDevice
for (Entry<String, Long> entry : curPartitionDeviceLatestTime.entrySet()) {
flushingLatestTimeForEachDevice
.computeIfAbsent(processor.getTimeRangeId(), id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
- // when the processor is closing, update partitionLatestFlushedTimeForEachDevice
+ // when the processor is closing, update partitionLatestFlushedTimeForEachDevice immediately
if (processor.isUpdateLatestTime()) {
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
@@ -1684,6 +1684,8 @@ public class StorageGroupProcessor {
if (config.isEnableSlidingMemTable()) {
curPartitionDeviceLatestTime = flushingLatestTimeForEachDevice
.get(processor.getTimeRangeId());
+ processor.setFlushingMemTable(null);
+ processor.setFlushMemTableAlive(false);
}
updateLatestTime(processor, curPartitionDeviceLatestTime);
}
@@ -2538,7 +2540,7 @@ public class StorageGroupProcessor {
@FunctionalInterface
public interface UpdateEndTimeCallBack {
- boolean call(TsFileProcessor caller);
+ boolean call(TsFileProcessor caller, boolean updateLatest);
}
@FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index e982933..95acd00 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -605,12 +605,9 @@ public class TsFileProcessor {
*/
public void asyncFlush() {
if (config.isEnableSlidingMemTable()){
- while (flushingMemTable != null) {
- try {
- TimeUnit.MILLISECONDS.sleep(waitingTimeWhenInsertBlocked);
- } catch (InterruptedException e) {
- logger.error("async flush failed because the flushing mem table is still alive", e);
- }
+ if (flushingMemTable != null) {
+ // if previous flushing mem table is still alive, update partitionLatestFlushedTimeForEachDevice
+ updateLatestFlushTimeCallback.call(this, true);
}
}
flushQueryLock.writeLock().lock();
@@ -674,7 +671,7 @@ public class TsFileProcessor {
if(isClosing){
updateLatestTime = true;
}
- updateLatestFlushTimeCallback.call(this);
+ updateLatestFlushTimeCallback.call(this, false);
workMemTable = null;
shouldFlush = false;
FlushManager.getInstance().registerTsFileProcessor(this);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 87bf8c1..172bd0b 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -138,9 +138,10 @@ public class StorageGroupProcessorTest {
@Test
public void testSlidingMemTable()
- throws WriteProcessException, IOException, MetadataException {
- TSRecord record;
+ throws WriteProcessException, QueryProcessException, MetadataException {
+ IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(true);
+ TSRecord record;
for (int j = 5; j <= 10; j++) {
record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
@@ -156,9 +157,12 @@ public class StorageGroupProcessorTest {
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertRowPlan(record));
}
-// System.exit(0);
processor.syncCloseAllWorkingTsFileProcessors();
-
+ QueryDataSource queryDataSource = processor
+ .query(new PartialPath(deviceId), measurementId, context,
+ null, null);
+ Assert.assertEquals(1, queryDataSource.getSeqResources().size());
+ IoTDBDescriptor.getInstance().getConfig().setEnableSlidingMemTable(false);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
index a1151c6..f89a24d 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorTest.java
@@ -87,7 +87,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndFlush begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -144,7 +144,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -227,7 +227,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -268,7 +268,7 @@ public class TsFileProcessorTest {
logger.info("testWriteAndRestoreMetadata begin..");
processor = new TsFileProcessor(storageGroup, SystemFileFactory.INSTANCE.getFile(filePath), sgInfo,
SysTimeVersionController.INSTANCE, this::closeTsFileProcessor,
- (tsFileProcessor) -> true, true);
+ (tsFileProcessor, updateLatest) -> true, true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);