You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/11/15 07:02:01 UTC
[iotdb] branch master updated: [IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e46df71015 [IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)
e46df71015 is described below
commit e46df7101597f6f295a72480260efec8a1bc202f
Author: Mrquan <50...@users.noreply.github.com>
AuthorDate: Tue Nov 15 15:01:56 2022 +0800
[IOTDB-4896] Fix error in closing a TsFileProcessor with an empty memtable (#7971)
---
.../iotdb/db/engine/storagegroup/DataRegion.java | 31 ++++++++-------
.../engine/storagegroup/HashLastFlushTimeMap.java | 3 +-
.../storagegroup/IDTableLastFlushTimeMap.java | 3 +-
.../db/engine/storagegroup/ILastFlushTimeMap.java | 2 +-
.../db/engine/storagegroup/TsFileProcessor.java | 46 ++++++++++++++++++----
.../db/engine/storagegroup/DataRegionTest.java | 23 ++++++++++-
.../engine/storagegroup/TsFileProcessorV2Test.java | 12 +++---
7 files changed, 85 insertions(+), 35 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 3bcf0a916e..e0864da1cc 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -1379,6 +1379,8 @@ public class DataRegion {
logger.info(
"Async close tsfile: {}",
tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
+
+ boolean isEmptyFile = tsFileProcessor.isEmpty();
if (sequence) {
closingSequenceTsFileProcessor.add(tsFileProcessor);
tsFileProcessor.asyncClose();
@@ -1401,6 +1403,16 @@ public class DataRegion {
timePartitionIdVersionControllerMap.remove(tsFileProcessor.getTimeRangeId());
}
}
+ if (isEmptyFile) {
+ try {
+ fsFactory.deleteIfExists(tsFileProcessor.getTsFileResource().getTsFile());
+ tsFileManager.remove(tsFileProcessor.getTsFileResource(), sequence);
+ } catch (IOException e) {
+ logger.error(
+ "Remove empty file {} error",
+ tsFileProcessor.getTsFileResource().getTsFile().getAbsolutePath());
+ }
+ }
}
/**
@@ -2059,7 +2071,7 @@ public class DataRegion {
}
}
- private boolean unsequenceFlushCallback(
+ private void unsequenceFlushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
TimePartitionManager.getInstance()
.updateAfterFlushing(
@@ -2068,21 +2080,11 @@ public class DataRegion {
systemFlushTime,
lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
workSequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
- return true;
}
- private boolean sequenceFlushCallback(
+ private void sequenceFlushCallback(
TsFileProcessor processor, Map<String, Long> updateMap, long systemFlushTime) {
- boolean res = lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
- if (!res) {
- logger.warn(
- "Partition: {} does't have latest time for each device. "
- + "No valid record is written into memtable. Flushing tsfile is: {}",
- processor.getTimeRangeId(),
- processor.getTsFileResource().getTsFile());
- return res;
- }
-
+ lastFlushTimeMap.updateLatestFlushTime(processor.getTimeRangeId(), updateMap);
TimePartitionManager.getInstance()
.updateAfterFlushing(
new DataRegionId(Integer.valueOf(dataRegionId)),
@@ -2090,7 +2092,6 @@ public class DataRegion {
systemFlushTime,
lastFlushTimeMap.getMemSize(processor.getTimeRangeId()),
workUnsequenceTsFileProcessors.get(processor.getTimeRangeId()) != null);
- return res;
}
/** used for upgrading */
@@ -3276,7 +3277,7 @@ public class DataRegion {
@FunctionalInterface
public interface UpdateEndTimeCallBack {
- boolean call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
+ void call(TsFileProcessor caller, Map<String, Long> updateMap, long systemFlushTime);
}
@FunctionalInterface
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
index b110cd6768..020077bb6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/HashLastFlushTimeMap.java
@@ -181,7 +181,7 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
}
@Override
- public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+ public void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
partitionLatestFlushedTimeForEachDevice
.computeIfAbsent(partitionId, id -> new HashMap<>())
@@ -193,7 +193,6 @@ public class HashLastFlushTimeMap implements ILastFlushTimeMap {
globalLatestFlushedTimeForEachDevice.put(entry.getKey(), entry.getValue());
}
}
- return true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
index 8e690290cd..6ad437fdc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/IDTableLastFlushTimeMap.java
@@ -113,7 +113,7 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
}
@Override
- public boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
+ public void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap) {
for (Map.Entry<String, Long> entry : updateMap.entrySet()) {
DeviceEntry deviceEntry = idTable.getDeviceEntry(entry.getKey());
deviceEntry.updateFlushTimeMap(partitionId, entry.getValue());
@@ -121,7 +121,6 @@ public class IDTableLastFlushTimeMap implements ILastFlushTimeMap {
deviceEntry.setGlobalFlushTime(entry.getValue());
}
}
- return true;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
index a98dd06e6f..f344b73f31 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/ILastFlushTimeMap.java
@@ -52,7 +52,7 @@ public interface ILastFlushTimeMap {
// region support upgrade methods
void applyNewlyFlushedTimeToFlushedTime();
- boolean updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
+ void updateLatestFlushTime(long partitionId, Map<String, Long> updateMap);
// endregion
// region query
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index 853484d51b..e59c593604 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -798,7 +798,6 @@ public class TsFileProcessor {
FLUSH_QUERY_WRITE_LOCKED, storageGroupName, tsFileResource.getTsFile().getName());
}
try {
-
if (logger.isInfoEnabled()) {
if (workMemTable != null) {
logger.info(
@@ -833,7 +832,7 @@ public class TsFileProcessor {
// we have to add the memtable into flushingList first and then set the shouldClose tag.
// see https://issues.apache.org/jira/browse/IOTDB-510
IMemTable tmpMemTable =
- workMemTable == null || workMemTable.memSize() == 0
+ workMemTable == null || workMemTable.getTotalPointsNum() == 0
? new NotifyFlushMemTable()
: workMemTable;
@@ -845,8 +844,12 @@ public class TsFileProcessor {
}
// When invoke closing TsFile after insert data to memTable, we shouldn't flush until invoke
// flushing memTable in System module.
- addAMemtableIntoFlushingList(tmpMemTable);
- logger.info("Memtable {} has been added to flushing list", tmpMemTable);
+ if (totalMemTableSize == 0 && tmpMemTable.getTotalPointsNum() == 0) {
+ endEmptyFile();
+ } else {
+ addAMemtableIntoFlushingList(tmpMemTable);
+ logger.info("Memtable {} has been added to flushing list", tmpMemTable);
+ }
shouldClose = true;
} catch (Exception e) {
logger.error(
@@ -956,10 +959,10 @@ public class TsFileProcessor {
lastTimeForEachDevice = tobeFlushed.getMaxTime();
tsFileResource.updateEndTime(lastTimeForEachDevice);
}
- if (!tobeFlushed.isSignalMemTable()
- && (tobeFlushed.memSize() == 0
- || (!updateLatestFlushTimeCallback.call(
- this, lastTimeForEachDevice, System.currentTimeMillis())))) {
+
+ updateLatestFlushTimeCallback.call(this, lastTimeForEachDevice, System.currentTimeMillis());
+
+ if (!tobeFlushed.isSignalMemTable() && tobeFlushed.getTotalPointsNum() == 0) {
logger.warn(
"This normal memtable is empty, skip it in flush. {}: {} Memetable info: {}",
storageGroupName,
@@ -1303,6 +1306,28 @@ public class TsFileProcessor {
writer = null;
}
+ /** end empty file and remove it from file system */
+ private void endEmptyFile() throws TsFileProcessorException, IOException {
+ logger.info("Start to end empty file {}", tsFileResource);
+
+ // remove this processor from Closing list in DataRegion,
+ // mark the TsFileResource closed, no need writer anymore
+ writer.close();
+ for (CloseFileListener closeFileListener : closeFileListeners) {
+ closeFileListener.onClosed(this);
+ }
+ if (enableMemControl) {
+ tsFileProcessorInfo.clear();
+ dataRegionInfo.closeTsFileProcessorAndReportToSystem(this);
+ }
+ logger.info(
+ "Storage group {} close and remove empty file {}",
+ storageGroupName,
+ tsFileResource.getTsFile().getAbsoluteFile());
+
+ writer = null;
+ }
+
public boolean isManagedByFlushManager() {
return managedByFlushManager;
}
@@ -1496,6 +1521,11 @@ public class TsFileProcessor {
}
}
+ public boolean isEmpty() {
+ return totalMemTableSize == 0
+ && (workMemTable == null || workMemTable.getTotalPointsNum() == 0);
+ }
+
@TestOnly
public IMemTable getWorkMemTable() {
return workMemTable;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
index 8a5888601e..58f7311635 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/DataRegionTest.java
@@ -1087,7 +1087,7 @@ public class DataRegionTest {
// delete data which is in flushing memtable
dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 100, 0, null);
dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 50, 150, 0, null);
- dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 300, 0, null);
+ dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 190, 0, null);
dataRegion.syncCloseAllWorkingTsFileProcessors();
Assert.assertTrue(tsFileResource.getModFile().exists());
@@ -1150,6 +1150,27 @@ public class DataRegionTest {
Assert.assertEquals(3, tsFileResource.getModFile().getModifications().size());
}
+ @Test
+ public void testFlushingEmptyMemtable()
+ throws IllegalPathException, WriteProcessException, TriggerExecutionException, IOException {
+ for (int j = 100; j < 200; j++) {
+ TSRecord record = new TSRecord(j, deviceId);
+ record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
+ dataRegion.insert(buildInsertRowNodeByTSRecord(record));
+ }
+ TsFileResource tsFileResource = dataRegion.getTsFileManager().getTsFileList(true).get(0);
+
+ // delete all data which is in flushing memtable
+ dataRegion.deleteByDevice(new PartialPath("root.vehicle.d0.s0"), 100, 200, 0, null);
+
+ dataRegion.syncCloseAllWorkingTsFileProcessors();
+ Assert.assertFalse(tsFileResource.getTsFile().exists());
+ Assert.assertFalse(tsFileResource.getModFile().exists());
+ Assert.assertFalse(dataRegion.getTsFileManager().contains(tsFileResource, true));
+ Assert.assertFalse(
+ dataRegion.getWorkSequenceTsFileProcessors().contains(tsFileResource.getProcessor()));
+ }
+
static class DummyDataRegion extends DataRegion {
DummyDataRegion(String systemInfoDir, String storageGroupName) throws DataRegionException {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
index ca944ce80c..9840b66da7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessorV2Test.java
@@ -110,7 +110,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -169,7 +169,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -256,7 +256,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
@@ -299,7 +299,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -335,7 +335,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);
processor.setTsFileProcessorInfo(tsFileProcessorInfo);
@@ -371,7 +371,7 @@ public class TsFileProcessorV2Test {
SystemFileFactory.INSTANCE.getFile(filePath),
sgInfo,
this::closeTsFileProcessor,
- (tsFileProcessor, updateMap, systemFlushTime) -> true,
+ (tsFileProcessor, updateMap, systemFlushTime) -> {},
true);
TsFileProcessorInfo tsFileProcessorInfo = new TsFileProcessorInfo(sgInfo);