You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/17 05:46:45 UTC
[incubator-iotdb] branch master updated: fix execute flush command
while inserting bug (#916)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8681bb2 fix execute flush command while inserting bug (#916)
8681bb2 is described below
commit 8681bb2d53c094086e8586a05959f5f5f8612896
Author: Jialin Qiao <qj...@mails.tsinghua.edu.cn>
AuthorDate: Tue Mar 17 13:46:39 2020 +0800
fix execute flush command while inserting bug (#916)
* fix flush command bug
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 8 ++---
.../iotdb/db/engine/flush/TsFileFlushPolicy.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 39 +++++++++++-----------
.../db/engine/cache/DeviceMetaDataCacheTest.java | 6 ++--
.../storagegroup/StorageGroupProcessorTest.java | 28 ++++++++--------
.../iotdb/db/engine/storagegroup/TTLTest.java | 10 +++---
6 files changed, 45 insertions(+), 48 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 93df289..6bd9b14 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -303,9 +303,7 @@ public class StorageEngine implements IService {
public void syncCloseAllProcessor() {
logger.info("Start closing all storage group processor");
for (StorageGroupProcessor processor : processorMap.values()) {
- processor.waitForAllCurrentTsFileProcessorsClosed();
- //TODO do we need to wait for all merging tasks to be finished here?
- processor.closeAllResources();
+ processor.syncCloseAllWorkingTsFileProcessors();
}
}
@@ -321,13 +319,13 @@ public class StorageEngine implements IService {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkSequenceTsFileProcessors())) {
- processor.moveOneWorkProcessorToClosingList(true, tsfileProcessor);
+ processor.asyncCloseOneTsFileProcessor(true, tsfileProcessor);
}
} else {
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsfileProcessor : new ArrayList<>(
processor.getWorkUnsequenceTsFileProcessor())) {
- processor.moveOneWorkProcessorToClosingList(false, tsfileProcessor);
+ processor.asyncCloseOneTsFileProcessor(false, tsfileProcessor);
}
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
index 0b3b61b..7df2508 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/flush/TsFileFlushPolicy.java
@@ -45,7 +45,7 @@ public interface TsFileFlushPolicy {
tsFileProcessor.getTsFileResource().getFile().getAbsolutePath());
if (tsFileProcessor.shouldClose()) {
- storageGroupProcessor.moveOneWorkProcessorToClosingList(isSeq, tsFileProcessor);
+ storageGroupProcessor.asyncCloseOneTsFileProcessor(isSeq, tsFileProcessor);
} else {
tsFileProcessor.asyncFlush();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 72e05b1..674c0fd 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -94,7 +94,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
* (1) when inserting data into the TsFileProcessor, and the TsFileProcessor shouldFlush() (or
* shouldClose())<br/>
* <p>
- * (2) someone calls waitForAllCurrentTsFileProcessorsClosed(). (up to now, only flush command from
+ * (2) someone calls syncCloseAllWorkingTsFileProcessors(). (up to now, only flush command from
* cli will call this method)<br/>
* <p>
* UnSequence data has the similar process as above.
@@ -102,7 +102,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
* When a sequence TsFileProcessor is submitted to be flushed, the updateLatestFlushTimeCallback()
* method will be called as a callback.<br/>
* <p>
- * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessor() method will be called as a
+ * When a TsFileProcessor is closed, the closeUnsealedTsFileProcessorCallBack() method will be called as a
* callback.
*/
public class StorageGroupProcessor {
@@ -386,7 +386,7 @@ public class StorageGroupProcessor {
// the last file is not closed, continue writing to in
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
schema, getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessor,
+ this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true, writer);
workUnsequenceTsFileProcessors
.put(timePartitionId, tsFileProcessor);
@@ -415,7 +415,7 @@ public class StorageGroupProcessor {
// the last file is not closed, continue writing to in
TsFileProcessor tsFileProcessor = new TsFileProcessor(storageGroupName, tsFileResource,
schema, getVersionControllerByTimePartitionId(timePartitionId),
- this::closeUnsealedTsFileProcessor,
+ this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false, writer);
tsFileResource.setProcessor(tsFileProcessor);
tsFileProcessor.setTimeRangeId(timePartitionId);
@@ -745,7 +745,7 @@ public class StorageGroupProcessor {
tsFileProcessorTreeMap.size(),
IoTDBDescriptor.getInstance().getConfig().getMemtableNumInEachStorageGroup() / 2,
storageGroupName);
- moveOneWorkProcessorToClosingList(sequence, processorEntry.getValue());
+ asyncCloseOneTsFileProcessor(sequence, processorEntry.getValue());
}
// build new processor
@@ -787,12 +787,12 @@ public class StorageGroupProcessor {
if (sequence) {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath),
- schema, versionController, this::closeUnsealedTsFileProcessor,
+ schema, versionController, this::closeUnsealedTsFileProcessorCallBack,
this::updateLatestFlushTimeCallback, true);
} else {
tsFileProcessor = new TsFileProcessor(storageGroupName,
fsFactory.getFileWithParent(filePath),
- schema, versionController, this::closeUnsealedTsFileProcessor,
+ schema, versionController, this::closeUnsealedTsFileProcessorCallBack,
this::unsequenceFlushCallback, false);
}
@@ -820,8 +820,7 @@ public class StorageGroupProcessor {
/**
* thread-safety should be ensured by caller
*/
- public void moveOneWorkProcessorToClosingList(boolean sequence,
- TsFileProcessor tsFileProcessor) {
+ public void asyncCloseOneTsFileProcessor(boolean sequence, TsFileProcessor tsFileProcessor) {
//for sequence tsfile, we update the endTimeMap only when the file is prepared to be closed.
//for unsequence tsfile, we have maintained the endTimeMap when an insertion comes.
if (sequence) {
@@ -852,8 +851,8 @@ public class StorageGroupProcessor {
*/
public void deleteFolder(String systemDir) {
logger.info("{} will close all files for deleting data folder {}", storageGroupName, systemDir);
- waitForAllCurrentTsFileProcessorsClosed();
writeLock();
+ syncCloseAllWorkingTsFileProcessors();
try {
File storageGroupFolder = SystemFileFactory.INSTANCE.getFile(systemDir, storageGroupName);
if (storageGroupFolder.exists()) {
@@ -885,7 +884,8 @@ public class StorageGroupProcessor {
public void syncDeleteDataFiles() {
logger.info("{} will close all files for deleting data files", storageGroupName);
- waitForAllCurrentTsFileProcessorsClosed();
+ writeLock();
+ syncCloseAllWorkingTsFileProcessors();
//normally, mergingModification is just need to be closed by after a merge task is finished.
//we close it here just for IT test.
if (this.mergingModification != null) {
@@ -896,7 +896,6 @@ public class StorageGroupProcessor {
}
}
- writeLock();
try {
closeAllResources();
List<String> folder = DirectoryManager.getInstance().getAllSequenceFileFolders();
@@ -995,17 +994,17 @@ public class StorageGroupProcessor {
/**
* This method will be blocked until all tsfile processors are closed.
*/
- public void waitForAllCurrentTsFileProcessorsClosed() {
+ public void syncCloseAllWorkingTsFileProcessors() {
synchronized (closeStorageGroupCondition) {
try {
- putAllWorkingTsFileProcessorIntoClosingList();
+ asyncCloseAllWorkingTsFileProcessors();
long startTime = System.currentTimeMillis();
while (!closingSequenceTsFileProcessor.isEmpty() || !closingUnSequenceTsFileProcessor
.isEmpty()) {
closeStorageGroupCondition.wait(60_000);
if (System.currentTimeMillis() - startTime > 60_000) {
logger.warn("{} has spent {}s to wait for closing all TsFiles.", this.storageGroupName,
- (System.currentTimeMillis() - startTime)/1000);
+ (System.currentTimeMillis() - startTime) / 1000);
}
}
} catch (InterruptedException e) {
@@ -1015,19 +1014,19 @@ public class StorageGroupProcessor {
}
}
- public void putAllWorkingTsFileProcessorIntoClosingList() {
+ public void asyncCloseAllWorkingTsFileProcessors() {
writeLock();
try {
logger.info("async force close all files in storage group: {}", storageGroupName);
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workSequenceTsFileProcessors.values())) {
- moveOneWorkProcessorToClosingList(true, tsFileProcessor);
+ asyncCloseOneTsFileProcessor(true, tsFileProcessor);
}
// to avoid concurrent modification problem, we need a new array list
for (TsFileProcessor tsFileProcessor : new ArrayList<>(
workUnsequenceTsFileProcessors.values())) {
- moveOneWorkProcessorToClosingList(false, tsFileProcessor);
+ asyncCloseOneTsFileProcessor(false, tsFileProcessor);
}
} finally {
writeUnlock();
@@ -1309,7 +1308,7 @@ public class StorageGroupProcessor {
* put the memtable back to the MemTablePool and make the metadata in writer visible
*/
// TODO please consider concurrency with query and insert method.
- private void closeUnsealedTsFileProcessor(
+ private void closeUnsealedTsFileProcessorCallBack(
TsFileProcessor tsFileProcessor) throws TsFileProcessorException {
closeQueryLock.writeLock().lock();
try {
@@ -1370,7 +1369,7 @@ public class StorageGroupProcessor {
}
logger.info("{} will close all files for starting a merge (fullmerge = {})", storageGroupName,
fullMerge);
- waitForAllCurrentTsFileProcessorsClosed();
+ syncCloseAllWorkingTsFileProcessors();
if (unSequenceFileList.isEmpty() || sequenceFileTreeSet.isEmpty()) {
logger.info("{} no files to be merged", storageGroupName);
return;
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
index ff05387..a46be2a 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/cache/DeviceMetaDataCacheTest.java
@@ -106,17 +106,17 @@ public class DeviceMetaDataCacheTest {
for (int j = 11; j <= 20; j++) {
insertOneRecord(j, j);
}
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, 0); // will be covered when read
}
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
for (int j = 21; j <= 30; j += 2) {
insertOneRecord(j, j);
}
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
insertOneRecord(2, 100);
}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index e86a985..0abeb56 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -89,7 +89,7 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(10000, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(1000)));
processor.insert(new InsertPlan(record));
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 1; j <= 10; j++) {
record = new TSRecord(j, deviceId);
@@ -137,10 +137,10 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -178,7 +178,7 @@ public class StorageGroupProcessorTest {
batchInsertPlan1.setRowCount(times.length);
processor.insertBatch(batchInsertPlan1);
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
BatchInsertPlan batchInsertPlan2 = new BatchInsertPlan("root.vehicle.d0", measurements,
dataTypes);
@@ -193,8 +193,8 @@ public class StorageGroupProcessorTest {
batchInsertPlan2.setRowCount(times.length);
processor.insertBatch(batchInsertPlan2);
- processor.putAllWorkingTsFileProcessorIntoClosingList();
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.asyncCloseAllWorkingTsFileProcessors();
+ processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -214,18 +214,18 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource = processor.query(deviceId, measurementId, context,
null, null);
@@ -247,18 +247,18 @@ public class StorageGroupProcessorTest {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
for (int j = 10; j >= 1; j--) {
TSRecord record = new TSRecord(j, deviceId);
record.addTuple(DataPoint.getDataPoint(TSDataType.INT32, measurementId, String.valueOf(j)));
processor.insert(new InsertPlan(record));
- processor.putAllWorkingTsFileProcessorIntoClosingList();
+ processor.asyncCloseAllWorkingTsFileProcessors();
}
- processor.waitForAllCurrentTsFileProcessorsClosed();
+ processor.syncCloseAllWorkingTsFileProcessors();
processor.merge(true);
while (mergeLock.get() == 0) {
// wait
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index d28006b..24ad39f 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -83,7 +83,7 @@ public class TTLTest {
@After
public void tearDown() throws IOException, StorageEngineException {
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
EnvironmentUtils.cleanEnv();
}
@@ -160,7 +160,7 @@ public class TTLTest {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
}
}
// unsequence data
@@ -168,7 +168,7 @@ public class TTLTest {
insertPlan.setTime(initTime - 2000 + i);
storageGroupProcessor.insert(insertPlan);
if ((i + 1) % 300 == 0) {
- storageGroupProcessor.putAllWorkingTsFileProcessorIntoClosingList();
+ storageGroupProcessor.asyncCloseAllWorkingTsFileProcessors();
}
}
}
@@ -225,7 +225,7 @@ public class TTLTest {
public void testTTLRemoval() throws StorageEngineException, QueryProcessException {
prepareData();
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
// files before ttl
File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
@@ -335,7 +335,7 @@ public class TTLTest {
@Test
public void testTTLCleanFile() throws QueryProcessException {
prepareData();
- storageGroupProcessor.waitForAllCurrentTsFileProcessorsClosed();
+ storageGroupProcessor.syncCloseAllWorkingTsFileProcessors();
assertEquals(4, storageGroupProcessor.getSequenceFileTreeSet().size());
assertEquals(4, storageGroupProcessor.getUnSequenceFileList().size());