You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/17 02:39:45 UTC
[incubator-iotdb] 01/01: fix flush
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_flush_close_file_error
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 9302734bfdc2bf0092abaf3f6a5bb406864a4f62
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Mar 17 10:39:25 2020 +0800
fix flush
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 8 ++--
.../iotdb/db/engine/flush/TsFileFlushPolicy.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 54 ++++++++++++----------
.../db/engine/cache/DeviceMetaDataCacheTest.java | 6 +--
.../storagegroup/StorageGroupProcessorTest.java | 28 +++++------
.../iotdb/db/engine/storagegroup/TTLTest.java | 10 ++--
6 files changed, 55 insertions(+), 53 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93df289..6b742d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -303,9 +303,7 @@ public class StorageEngine implements IService {
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
for (StorageGroupProcessor processor : processorMap.values()) {
- processor.waitForAllCurrentTsFileProcessorsClosed();
- //TODO do we need to wait for all merging tasks to be finished here?
- processor.closeAllResources();
+ processor.syncCloseAllTsFileProcessors();
}
}
@@ -321,13 +319,13 @@ public class StorageEngine implements IService {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkSequenceTsFileProcessors())) {
- processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
+ processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
}
} else {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkUnsequenceTsFileProcessor())) {
- processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor);
+ processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
}
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 0b3b61b..7df2508 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -45,7 +45,7 @@ public interface TsFileFlushPolicy {
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
if (tsFileProcessor.shouldClose()) {
- storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor);
+ storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
} else {
tsFileProcessor.asyncFlush();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 72e05b1..03ac594 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -94,7 +94,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
* <p>
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
+ * (2) someone calls syncCloseAllTsFileProcessors(). (up to now, only flush command from
* cli will call this method)<br/>
* <p>
* UnSequence data has the similar process as above.
@@ -745,7 +745,7 @@ public class StorageGroupProcessor {
tsFileProcessorTreeMap.size(),
IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2,
storageGroupName);
- moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue());
+ asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
}
// build new processor
@@ -820,8 +820,7 @@ public class StorageGroupProcessor {
/**
* thread-safety should be ensured by caller
*/
- public void moveOneWorkProcessorToClosingList(boolean sequence,
- TsFileProcessor tsFileProcessor) {
+ public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (sequence) {
@@ -852,8 +851,8 @@ public class StorageGroupProcessor {
*/
public void deleteFolder(String systemDir) {
logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
- waitForAllCurrentTsFileProcessorsClosed();
writeLock();
+ syncCloseAllTsFileProcessors();
try {
File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
if (storageGroupFolder.exists()) {
@@ -885,7 +884,8 @@ public class StorageGroupProcessor {
public void syncDeleteDataFiles() {
logger.info("{} will close all files for deleting data files", storageGroupName);
- waitForAllCurrentTsFileProcessorsClosed();
+ writeLock();
+ syncCloseAllTsFileProcessors();
//normally, mergingModification is just need to be closed by after a merge task is finished.
//we close it here just for IT test.
if (this.mergingModification != null) {
@@ -896,7 +896,6 @@ public class StorageGroupProcessor {
}
}
- writeLock();
try {
closeAllResources();
List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -995,39 +994,44 @@ public class StorageGroupProcessor {
/**
* This method will be blocked until all tsfile processors are closed.
*/
- public void waitForAllCurrentTsFileProcessorsClosed() {
- synchronized (closeStorageGroupCondition) {
- try {
- putAllWorkingTsFileProcessorIntoClosingList();
- long startTime = System.currentTimeMillis();
- while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
- .isEmpty()) {
- closeStorageGroupCondition.wait(60_000);
- if (System.currentTimeMillis() - startTime > 60_000) {
- logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
- (System.currentTimeMillis() - startTime)/1000);
+ public void syncCloseAllTsFileProcessors() {
+ writeLock();
+ try {
+ synchronized (closeStorageGroupCondition) {
+ try {
+ asyncCloseAllTsFileProcessors();
+ long startTime = System.currentTimeMillis();
+ while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
+ .isEmpty()) {
+ closeStorageGroupCondition.wait(60_000);
+ if (System.currentTimeMillis() - startTime > 60_000) {
+ logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
+ (System.currentTimeMillis() - startTime)/1000);
+ }
}
+ } catch (InterruptedException e) {
+ logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
+ + "group {}", storageGroupName, e);
}
- } catch (InterruptedException e) {
- logger.error("CloseFileNodeCondition error occurs while waiting for closing the storage "
- + "group {}", storageGroupName, e);
}
+ } finally {
+ writeUnlock();
}
}
- public void putAllWorkingTsFileProcessorIntoClosingList() {
+ public void asyncCloseAllTsFileProcessors() {
writeLock();
try {
logger.info("async force close all files in storage group: {}", storageGroupName);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workSequenceTsFileProcessors.values())) {
- moveOneWorkProcessorToClosingList(true, tsFileProcessor);
+ asyncCloseOneTsFileProcessor(true, tsFileProcessor);
}
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workUnsequenceTsFileProcessors.values())) {
- moveOneWorkProcessorToClosingList(false, tsFileProcessor);
+ asyncCloseOneTsFileProcessor(false, tsFileProcessor);
}
} finally {
writeUnlock();
@@ -1370,7 +1374,7 @@ public class StorageGroupProcessor {
}
logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
fullMerge);
- waitForAllCurrentTsFileProcessorsClosed();
+ syncCloseAllTsFileProcessors();
if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
logger.info("{} no files to be merged", storageGroupName);
return;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index ff05387..2f51e18 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest {
for (int j = 11; j <= 20; j++) {
insertOneRecord(j, j);
}
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllTsFileProcessors();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, 0); // will be covered when read
}
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllTsFileProcessors();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, j);
}
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllTsFileProcessors();
insertOneRecord(2, 100);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e86a985..ec645b3 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -89,7 +89,7 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
@@ -137,10 +137,10 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -178,7 +178,7 @@ public class StorageGroupProcessorTest {
batchInsertPlan1.setRowCount(times.length);
processor.insertBatch(batchInsertPlan1);
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements,
dataTypes);
@@ -193,8 +193,8 @@ public class StorageGroupProcessorTest {
batchInsertPlan2.setRowCount(times.length);
processor.insertBatch(batchInsertPlan2);
- processor.putAllWorkingTsFileProcessorIntoClosingList();
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.asyncCloseAllTsFileProcessors();
+ processor.syncCloseAllTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -214,18 +214,18 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -247,18 +247,18 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllTsFileProcessors();
processor.merge(true);
while (mergeLock.get() == 0) {
// wait
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index d28006b..55eb0a8 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -83,7 +83,7 @@ public class TTLTest {
@After
public void tearDown() throws IOException, StorageEngineException {
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllTsFileProcessors();
EnvironmentUtils.cleanEnv();
}
@@ -160,7 +160,7 @@ public class TTLTest {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllTsFileProcessors();
}
}
// unsequence data
@@ -168,7 +168,7 @@ public class TTLTest {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllTsFileProcessors();
}
}
}
@@ -225,7 +225,7 @@ public class TTLTest {
public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
prepareData();
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllTsFileProcessors();
// files before ttl
File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -335,7 +335,7 @@ public class TTLTest {
@Test
public void testTTLCleanFile() throws QueryProcessException {
prepareData();
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllTsFileProcessors();
assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());