You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/06/25 02:59:13 UTC
[incubator-iotdb] branch feature_async_close_tsfile updated: fix
dead lock bug
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch feature_async_close_tsfile
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/feature_async_close_tsfile by this push:
new 8189a87 fix dead lock bug
new 9bf0540 Merge branch 'feature_async_close_tsfile' of github.com:apache/incubator-iotdb into feature_async_close_tsfile
8189a87 is described below
commit 8189a8731f2cf9fd7dcbdc2b99765425038972d9
Author: lta <li...@163.com>
AuthorDate: Tue Jun 25 10:57:23 2019 +0800
fix dead lock bug
---
.../db/engine/filenodeV2/FileNodeProcessorV2.java | 155 ++++++++++-----------
.../filenodeV2/UnsealedTsFileProcessorV2.java | 22 ++-
.../iotdb/db/engine/memtable/MemTablePool.java | 2 +-
3 files changed, 91 insertions(+), 88 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
index 1cb1dc6..d7f6f6b 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/FileNodeProcessorV2.java
@@ -29,12 +29,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
-import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.filenode.CopyOnReadLinkedList;
@@ -66,9 +64,6 @@ public class FileNodeProcessorV2 {
private static final Logger LOGGER = LoggerFactory.getLogger(FileNodeProcessorV2.class);
- private static final MManager mManager = MManager.getInstance();
- private static final DirectoryManager directoryManager = DirectoryManager.getInstance();
-
private FileSchema fileSchema;
// includes sealed and unsealed sequnce tsfiles
@@ -95,7 +90,7 @@ public class FileNodeProcessorV2 {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- private Condition closeFileNodeCondition;
+ private Object closeFileNodeCondition = new Object();
/**
* Mark whether to close file node
@@ -113,7 +108,6 @@ public class FileNodeProcessorV2 {
public FileNodeProcessorV2(String baseDir, String storageGroupName) throws ProcessorException {
this.storageGroupName = storageGroupName;
- closeFileNodeCondition = lock.writeLock().newCondition();
// construct the file schema
this.fileSchema = constructFileSchema(storageGroupName);
@@ -124,7 +118,8 @@ public class FileNodeProcessorV2 {
try {
File storageGroupInfoDir = new File(baseDir, storageGroupName);
if (storageGroupInfoDir.mkdirs()) {
- LOGGER.info("Storage Group Info Directory {} doesn't exist, create it", storageGroupInfoDir.getPath());
+ LOGGER.info("Storage Group Info Directory {} doesn't exist, create it",
+ storageGroupInfoDir.getPath());
}
versionController = new SimpleFileVersionController(
@@ -139,32 +134,32 @@ public class FileNodeProcessorV2 {
private void recover() throws ProcessorException {
LOGGER.info("recover FileNodeProcessor {}", storageGroupName);
List<File> tsFiles = new ArrayList<>();
- List<String> seqFileFolders = directoryManager.getAllTsFileFolders();
- for (String baseDir: seqFileFolders) {
+ List<String> seqFileFolders = DirectoryManager.getInstance().getAllTsFileFolders();
+ for (String baseDir : seqFileFolders) {
File fileFolder = new File(baseDir, storageGroupName);
if (!fileFolder.exists()) {
continue;
}
- for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
+ for (File tsfile : fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX))) {
tsFiles.add(tsfile);
}
}
recoverSeqFiles(tsFiles);
tsFiles.clear();
- List<String> unseqFileFolder = directoryManager.getAllOverflowFileFolders();
- for (String baseDir: unseqFileFolder) {
+ List<String> unseqFileFolder = DirectoryManager.getInstance().getAllOverflowFileFolders();
+ for (String baseDir : unseqFileFolder) {
File fileFolder = new File(baseDir, storageGroupName);
if (!fileFolder.exists()) {
continue;
}
- for (File tsfile: fileFolder.listFiles(file->file.getName().endsWith(TSFILE_SUFFIX))) {
+ for (File tsfile : fileFolder.listFiles(file -> file.getName().endsWith(TSFILE_SUFFIX))) {
tsFiles.add(tsfile);
}
}
recoverUnseqFiles(tsFiles);
- for (TsFileResourceV2 resource: sequenceFileList) {
+ for (TsFileResourceV2 resource : sequenceFileList) {
latestTimeForEachDevice.putAll(resource.getEndTimeMap());
latestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
@@ -172,7 +167,7 @@ public class FileNodeProcessorV2 {
private void recoverSeqFiles(List<File> tsfiles) throws ProcessorException {
tsfiles.sort(new CompareFileName());
- for (File tsfile: tsfiles) {
+ for (File tsfile : tsfiles) {
TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
sequenceFileList.add(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-"
@@ -183,10 +178,11 @@ public class FileNodeProcessorV2 {
private void recoverUnseqFiles(List<File> tsfiles) throws ProcessorException {
tsfiles.sort(new CompareFileName());
- for (File tsfile: tsfiles) {
+ for (File tsfile : tsfiles) {
TsFileResourceV2 tsFileResource = new TsFileResourceV2(tsfile);
unSequenceFileList.add(tsFileResource);
- TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-", fileSchema,
+ TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
+ fileSchema,
versionController, tsFileResource, true);
recoverPerformer.recover();
}
@@ -199,7 +195,7 @@ public class FileNodeProcessorV2 {
String[] items1 = o1.getName().split("-");
String[] items2 = o2.getName().split("-");
if (Long.valueOf(items1[0]) - Long.valueOf(items2[0]) == 0) {
- return Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1]));
+ return Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1]));
} else {
return Long.compare(Long.valueOf(items1[0]), Long.valueOf(items2[0]));
}
@@ -208,7 +204,7 @@ public class FileNodeProcessorV2 {
private FileSchema constructFileSchema(String storageGroupName) {
List<MeasurementSchema> columnSchemaList;
- columnSchemaList = mManager.getSchemaForFileName(storageGroupName);
+ columnSchemaList = MManager.getInstance().getSchemaForFileName(storageGroupName);
FileSchema schema = new FileSchema();
for (MeasurementSchema measurementSchema : columnSchemaList) {
@@ -237,17 +233,15 @@ public class FileNodeProcessorV2 {
* @return -1: failed, 1: Overflow, 2:Bufferwrite
*/
public boolean insert(InsertPlan insertPlan) {
- lock.writeLock().lock();
-
try {
- if(toBeClosed){
- throw new FileNodeProcessorException("storage group " + storageGroupName + " is to be closed, this insertion is rejected");
+ if (toBeClosed) {
+ throw new FileNodeProcessorException(
+ "storage group " + storageGroupName + " is to be closed, this insertion is rejected");
}
// init map
latestTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
latestFlushedTimeForEachDevice.putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE);
- boolean result;
// insert to sequence or unSequence file
if (insertPlan.getTime() > latestFlushedTimeForEachDevice.get(insertPlan.getDeviceId())) {
return insertUnsealedDataFile(insertPlan, true);
@@ -257,12 +251,11 @@ public class FileNodeProcessorV2 {
} catch (FileNodeProcessorException | IOException e) {
LOGGER.error("insert tsRecord to unsealed data file failed, because {}", e.getMessage(), e);
return false;
- } finally {
- lock.writeLock().unlock();
}
}
- private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence) throws IOException {
+ private boolean insertUnsealedDataFile(InsertPlan insertPlan, boolean sequence)
+ throws IOException {
lock.writeLock().lock();
UnsealedTsFileProcessorV2 unsealedTsFileProcessor;
try {
@@ -296,7 +289,7 @@ public class FileNodeProcessorV2 {
}
return result;
- }finally {
+ } finally {
lock.writeLock().unlock();
}
}
@@ -304,9 +297,9 @@ public class FileNodeProcessorV2 {
private UnsealedTsFileProcessorV2 createTsFileProcessor(boolean sequence) throws IOException {
String baseDir;
if (sequence) {
- baseDir = directoryManager.getNextFolderForSequenceFile();
+ baseDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
} else {
- baseDir = directoryManager.getNextFolderForUnSequenceFile();
+ baseDir = DirectoryManager.getInstance().getNextFolderForUnSequenceFile();
}
new File(baseDir, storageGroupName).mkdirs();
@@ -321,7 +314,7 @@ public class FileNodeProcessorV2 {
} else {
return new UnsealedTsFileProcessorV2(storageGroupName, new File(filePath),
fileSchema, versionController, this::closeUnsealedTsFileProcessorCallback,
- ()->true);
+ () -> true);
}
}
@@ -361,7 +354,7 @@ public class FileNodeProcessorV2 {
if (tsFileResource.isClosed()) {
tsfileResourcesForQuery.add(tsFileResource);
} else {
- Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair = null;
+ Pair<ReadOnlyMemChunk, List<ChunkMetaData>> pair;
try {
pair = tsFileResource
.getUnsealedFileProcessor()
@@ -435,7 +428,8 @@ public class FileNodeProcessorV2 {
}
- private void deleteFiles(List<TsFileResourceV2> tsFileResourceList, Deletion deletion, List<ModificationFile> updatedModFiles)
+ private void deleteFiles(List<TsFileResourceV2> tsFileResourceList, Deletion deletion,
+ List<ModificationFile> updatedModFiles)
throws IOException {
String deviceId = deletion.getDevice();
for (TsFileResourceV2 tsFileResource : tsFileResourceList) {
@@ -459,8 +453,8 @@ public class FileNodeProcessorV2 {
}
/**
- * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the setCloseMark task
- * will be executed by a flush thread.
+ * ensure there must be a flush thread submitted after setCloseMark() is called, therefore the
+ * setCloseMark task will be executed by a flush thread.
*
* only called by insert(), thread-safety should be ensured by caller
*/
@@ -513,6 +507,7 @@ public class FileNodeProcessorV2 {
/**
* when close an UnsealedTsFileProcessor, update its EndTimeMap immediately
+ *
* @param tsFileProcessor processor to be closed
*/
private void updateEndTimeMap(UnsealedTsFileProcessorV2 tsFileProcessor) {
@@ -526,21 +521,22 @@ public class FileNodeProcessorV2 {
/**
* This method will be blocked until all tsfile processors are closed.
*/
- public void syncCloseFileNode(){
- lock.writeLock().lock();
- try {
- asyncForceClose();
- while (true) {
- if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()) {
- break;
+ public void syncCloseFileNode() {
+ synchronized (closeFileNodeCondition) {
+ try {
+ asyncForceClose();
+ while (true) {
+ if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor
+ .isEmpty()) {
+ break;
+ }
+ closeFileNodeCondition.wait();
}
- closeFileNodeCondition.await();
+ } catch (InterruptedException e) {
+ LOGGER
+ .error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
+ storageGroupName, e);
}
- } catch (InterruptedException e) {
- LOGGER.error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
- storageGroupName, e);
- } finally {
- lock.writeLock().unlock();
}
}
@@ -548,23 +544,24 @@ public class FileNodeProcessorV2 {
/**
* This method will be blocked until this file node can be closed.
*/
- public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback){
- lock.writeLock().lock();
- try {
- asyncForceClose();
- toBeClosed = true;
- while (true) {
- if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor.isEmpty()) {
- removeProcessorFromManagerCallback.get();
- break;
+ public void syncCloseAndStopFileNode(Supplier<Boolean> removeProcessorFromManagerCallback) {
+ synchronized (closeFileNodeCondition) {
+ try {
+ asyncForceClose();
+ toBeClosed = true;
+ while (true) {
+ if (closingSequenceTsFileProcessor.isEmpty() && closingUnSequenceTsFileProcessor
+ .isEmpty()) {
+ removeProcessorFromManagerCallback.get();
+ break;
+ }
+ closeFileNodeCondition.wait();
}
- closeFileNodeCondition.await();
+ } catch (InterruptedException e) {
+ LOGGER
+ .error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
+ storageGroupName, e);
}
- } catch (InterruptedException e) {
- LOGGER.error("CloseFileNodeCondition occurs error while waiting for closing the file node {}",
- storageGroupName, e);
- } finally {
- lock.writeLock().unlock();
}
}
@@ -586,21 +583,19 @@ public class FileNodeProcessorV2 {
* put the memtable back to the MemTablePool and make the metadata in writer visible
*/
// TODO please consider concurrency with query and insert method.
- public void closeUnsealedTsFileProcessorCallback(UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
- lock.writeLock().lock();
- try {
- // end time with one start time
- TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
- resource.setClosed(true);
- if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
- closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
- } else {
- closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
- }
- LOGGER.info("signal closing file node condition");
- closeFileNodeCondition.signal();
- }finally {
- lock.writeLock().unlock();
+ public void closeUnsealedTsFileProcessorCallback(
+ UnsealedTsFileProcessorV2 unsealedTsFileProcessor) {
+ // end time with one start time
+ TsFileResourceV2 resource = unsealedTsFileProcessor.getTsFileResource();
+ resource.setClosed(true);
+ if (closingSequenceTsFileProcessor.contains(unsealedTsFileProcessor)) {
+ closingSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+ } else {
+ closingUnSequenceTsFileProcessor.remove(unsealedTsFileProcessor);
+ }
+ LOGGER.info("signal closing file node condition");
+ synchronized (closeFileNodeCondition) {
+ closeFileNodeCondition.notify();
}
}
@@ -617,7 +612,7 @@ public class FileNodeProcessorV2 {
return storageGroupName;
}
- public int getClosingProcessorSize(){
+ public int getClosingProcessorSize() {
return unSequenceFileList.size() + sequenceFileList.size();
}
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
index 6d41d07..f16e0ab 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/filenodeV2/UnsealedTsFileProcessorV2.java
@@ -100,7 +100,8 @@ public class UnsealedTsFileProcessorV2 {
this.tsFileResource = new TsFileResourceV2(tsfile, this);
this.versionController = versionController;
this.writer = new NativeRestorableIOWriter(tsfile);
- this.logNode = MultiFileLogNodeManager.getInstance().getNode(storageGroupName + "-" + tsfile.getName());
+ this.logNode = MultiFileLogNodeManager.getInstance()
+ .getNode(storageGroupName + "-" + tsfile.getName());
this.closeUnsealedFileCallback = closeUnsealedFileCallback;
this.flushUpdateLatestFlushTimeCallback = flushUpdateLatestFlushTimeCallback;
LOGGER.info("create a new tsfile processor {}", tsfile.getAbsolutePath());
@@ -153,7 +154,7 @@ public class UnsealedTsFileProcessorV2 {
workMemTable
.delete(deletion.getDevice(), deletion.getMeasurement(), deletion.getTimestamp());
}
- for (IMemTable memTable: flushingMemTables) {
+ for (IMemTable memTable : flushingMemTables) {
memTable.delete(deletion);
}
} finally {
@@ -183,7 +184,8 @@ public class UnsealedTsFileProcessorV2 {
}
public void syncClose() {
- LOGGER.info("Synch close file: {}, first async close it", tsFileResource.getFile().getAbsolutePath());
+ LOGGER.info("Synch close file: {}, first async close it",
+ tsFileResource.getFile().getAbsolutePath());
asyncClose();
synchronized (flushingMemTables) {
try {
@@ -274,7 +276,9 @@ public class UnsealedTsFileProcessorV2 {
try {
writer.makeMetadataVisible();
flushingMemTables.remove(memTable);
- LOGGER.info("flush finished, remove a memtable from flushing list, flushing memtable list size: {}", flushingMemTables.size());
+ LOGGER.info(
+ "flush finished, remove a memtable from flushing list, flushing memtable list size: {}",
+ flushingMemTables.size());
} finally {
flushQueryLock.writeLock().unlock();
}
@@ -299,7 +303,8 @@ public class UnsealedTsFileProcessorV2 {
logNode.notifyEndFlush();
LOGGER.info("flush a memtable has finished");
} else {
- LOGGER.info("release an empty memtable from flushing memtable list, which is submitted in force flush");
+ LOGGER.info(
+ "release an empty memtable from flushing memtable list, which is submitted in force flush");
releaseFlushedMemTableCallback(memTableToFlush);
}
@@ -316,6 +321,7 @@ public class UnsealedTsFileProcessorV2 {
flushingMemTables.notify();
}
}
+
}
private void endFile() throws IOException {
@@ -406,8 +412,10 @@ public class UnsealedTsFileProcessorV2 {
ModificationFile modificationFile = tsFileResource.getModFile();
- List<ChunkMetaData> chunkMetaDataList = writer.getVisibleMetadatas(deviceId, measurementId, dataType);
- QueryUtils.modifyChunkMetaData(chunkMetaDataList, (List<Modification>) modificationFile.getModifications());
+ List<ChunkMetaData> chunkMetaDataList = writer
+ .getVisibleMetadatas(deviceId, measurementId, dataType);
+ QueryUtils.modifyChunkMetaData(chunkMetaDataList,
+ (List<Modification>) modificationFile.getModifications());
return new Pair<>(timeValuePairSorter, chunkMetaDataList);
} catch (IOException e) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
index 51936e0..4902ca6 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/memtable/MemTablePool.java
@@ -72,7 +72,7 @@ public class MemTablePool {
} catch (InterruptedException e) {
LOGGER.error("{} fails to wait fot memtables {}, continue to wait", applier, e);
}
- LOGGER.info("{} has waited for a memtable for {}ms", applier, waitCount * WAIT_TIME);
+ LOGGER.info("{} has waited for a memtable for {}ms", applier, waitCount++ * WAIT_TIME);
}
}
}