You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2020/04/28 12:39:07 UTC
[incubator-iotdb] branch fix_merge_stuck updated: add lock log
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch fix_merge_stuck
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/fix_merge_stuck by this push:
new f1b515a add lock log
f1b515a is described below
commit f1b515a4d3d6fe1a586dacc140d97771fce9e24c
Author: EJTTianyu <16...@qq.com>
AuthorDate: Tue Apr 28 20:37:50 2020 +0800
add lock log
---
.../engine/storagegroup/StorageGroupProcessor.java | 99 ++++++++++++----------
.../iotdb/db/query/control/FileReaderManager.java | 40 +++++----
2 files changed, 77 insertions(+), 62 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index ac40adf..120dd58 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -200,22 +200,20 @@ public class StorageGroupProcessor {
private TsFileFlushPolicy fileFlushPolicy;
/**
- * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close,
- * not including the files generated by merge) of each partition.
- * As data file close is managed by the leader in the distributed version, the files with the
- * same version(s) have the same data, despite that the inner structure (the size and
- * organization of chunks) may be different, so we can easily find what remote files we do not
- * have locally.
- * partition number -> version number set
+ * partitionDirectFileVersions records the versions of the direct TsFiles (generated by close, not
+ * including the files generated by merge) of each partition. As data file close is managed by the
+ * leader in the distributed version, the files with the same version(s) have the same data,
+ * despite that the inner structure (the size and organization of chunks) may be different, so we
+ * can easily find what remote files we do not have locally. partition number -> version number
+ * set
*/
private Map<Long, Set<Long>> partitionDirectFileVersions = new HashMap<>();
/**
- * The max file versions in each partition. By recording this, if several IoTDB instances have
- * the same policy of closing file and their ingestion is identical, then files of the same
- * version in different IoTDB instance will have identical data, providing convenience for data
- * comparison across different instances.
- * partition number -> max version number
+ * The max file versions in each partition. By recording this, if several IoTDB instances have the
+ * same policy of closing file and their ingestion is identical, then files of the same version in
+ * different IoTDB instance will have identical data, providing convenience for data comparison
+ * across different instances. partition number -> max version number
*/
private Map<Long, Long> partitionMaxFileVersions = new HashMap<>();
@@ -252,12 +250,14 @@ public class StorageGroupProcessor {
for (TsFileResource resource : sequenceFileTreeSet) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
for (TsFileResource resource : unSequenceFileList) {
long partitionNum = resource.getTimePartition();
- partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
+ partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
+ .addAll(resource.getHistoricalVersions());
updatePartitionFileVersion(partitionNum, Collections.max(resource.getHistoricalVersions()));
}
@@ -335,13 +335,13 @@ public class StorageGroupProcessor {
// the process was interrupted before the merged files could be named
continueFailedRenames(partitionFolder, MERGE_SUFFIX);
- if (!partitionFolder.isDirectory()) {
- logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
- continue;
- }
+ if (!partitionFolder.isDirectory()) {
+ logger.warn("{} is not a directory.", partitionFolder.getAbsolutePath());
+ continue;
+ }
- Collections.addAll(tsFiles,
- fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
+ Collections.addAll(tsFiles,
+ fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
}
}
@@ -463,7 +463,8 @@ public class StorageGroupProcessor {
long timePartitionId = StorageEngine.getTimePartition(insertPlan.getTime());
latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>());
- partitionLatestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>());
// insert to sequence or unSequence file
insertToTsFileProcessor(insertPlan,
@@ -502,7 +503,8 @@ public class StorageGroupProcessor {
// before is first start point
int before = loc;
// before time partition
- long beforeTimePartition = StorageEngine.getTimePartition(insertTabletPlan.getTimes()[before]);
+ long beforeTimePartition = StorageEngine
+ .getTimePartition(insertTabletPlan.getTimes()[before]);
// init map
long lastFlushTime = partitionLatestFlushedTimeForEachDevice.
computeIfAbsent(beforeTimePartition, id -> new HashMap<>()).
@@ -1189,7 +1191,8 @@ public class StorageGroupProcessor {
}
}
- private void logDeletion(long timestamp, String deviceId, String measurementId, long timePartitionId)
+ private void logDeletion(long timestamp, String deviceId, String measurementId,
+ long timePartitionId)
throws IOException {
if (IoTDBDescriptor.getInstance().getConfig().isEnableWal()) {
DeletePlan deletionPlan = new DeletePlan(timestamp, new Path(deviceId, measurementId));
@@ -1430,8 +1433,10 @@ public class StorageGroupProcessor {
@SuppressWarnings("squid:S1141")
private void updateMergeModification(TsFileResource seqFile) {
- logger.warn("start update merge mod");
+ logger.warn("start update merge mod, current File: {}, Read Lock Num: {}",
+ seqFile.getFile().toString(), seqFile.getWriteQueryLock().getReadLockCount());
seqFile.getWriteQueryLock().writeLock().lock();
+ logger.warn("get the write query lock");
try {
// remove old modifications and write modifications generated during merge
seqFile.removeModFile();
@@ -1451,6 +1456,7 @@ public class StorageGroupProcessor {
seqFile.getFile(), e);
} finally {
seqFile.getWriteQueryLock().writeLock().unlock();
+ logger.warn("rel the write query lock");
logger.warn("end update merge mod");
}
}
@@ -1485,7 +1491,7 @@ public class StorageGroupProcessor {
TsFileResource seqFile = seqFiles.get(i);
mergeLock.writeLock().lock();
try {
- logger.warn("start merge modification");
+ logger.warn("start merge modification, File: {}", seqFile.getFile().toString());
updateMergeModification(seqFile);
if (i == seqFiles.size() - 1) {
//FIXME if there is an exception, the the modification file will be not closed.
@@ -1520,7 +1526,7 @@ public class StorageGroupProcessor {
mergeLock.writeLock().lock();
try {
if (loadTsFileByType(LoadTsFileType.LOAD_SEQUENCE, tsfileToBeInserted, newTsFileResource,
- newFilePartitionId)){
+ newFilePartitionId)) {
updateLatestTimeMap(newTsFileResource);
}
} catch (DiskSpaceInsufficientException e) {
@@ -1587,7 +1593,8 @@ public class StorageGroupProcessor {
long partitionNum = newTsFileResource.getTimePartition();
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(partitionNum, Collections.max(newTsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(partitionNum,
+ Collections.max(newTsFileResource.getHistoricalVersions()));
} catch (DiskSpaceInsufficientException e) {
logger.error(
"Failed to append the tsfile {} to storage group processor {} because the disk space is insufficient.",
@@ -1601,12 +1608,12 @@ public class StorageGroupProcessor {
}
/**
- * Find the position of "newTsFileResource" in the sequence files if it can be inserted into them.
- * @param newTsFileResource
- * @param newFilePartitionId
- * @return POS_ALREADY_EXIST(-2) if some file has the same name as the one to be inserted
- * POS_OVERLAP(-3) if some file overlaps the new file
- * an insertion position i >= -1 if the new file can be inserted between [i, i+1]
+ * Find the position of "newTsFileResource" in the sequence files if it can be inserted into
+ * them.
+ *
+ * @return POS_ALREADY_EXIST(- 2) if some file has the same name as the one to be inserted
+ * POS_OVERLAP(-3) if some file overlaps the new file an insertion position i >= -1 if the new
+ * file can be inserted between [i, i+1]
*/
private int findInsertionPosition(TsFileResource newTsFileResource, long newFilePartitionId,
List<TsFileResource> sequenceList) {
@@ -1647,11 +1654,9 @@ public class StorageGroupProcessor {
/**
* Compare each device in the two files to find the time relation of them.
- * @param fileA
- * @param fileB
- * @return -1 if fileA is totally older than fileB (A < B)
- * 0 if fileA is partially older than fileB and partially newer than fileB (A X B)
- * 1 if fileA is totally newer than fileB (B < A)
+ *
+ * @return -1 if fileA is totally older than fileB (A < B) 0 if fileA is partially older than
+ * fileB and partially newer than fileB (A X B) 1 if fileA is totally newer than fileB (B < A)
*/
private int compareTsFileDevices(TsFileResource fileA, TsFileResource fileB) {
boolean hasPre = false, hasSubsequence = false;
@@ -1688,10 +1693,8 @@ public class StorageGroupProcessor {
/**
* If the historical versions of a file is a sub-set of the given file's, remove it to reduce
- * unnecessary merge. Only used when the file sender and the receiver share the same file
- * close policy.
- * Warning: DO NOT REMOVE
- * @param resource
+ * unnecessary merge. Only used when the file sender and the receiver share the same file close
+ * policy. Warning: DO NOT REMOVE
*/
@SuppressWarnings("unused")
public void removeFullyOverlapFiles(TsFileResource resource) {
@@ -1752,8 +1755,8 @@ public class StorageGroupProcessor {
* version number is the version number in the tsfile with a larger timestamp.
*
* @param tsfileName origin tsfile name
- * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex
- * + 1]
+ * @param insertIndex the new file will be inserted between the files [insertIndex, insertIndex +
+ * 1]
* @return appropriate filename
*/
private String getFileNameForLoadingFile(String tsfileName, int insertIndex,
@@ -1819,9 +1822,9 @@ public class StorageGroupProcessor {
* @param type load type
* @param tsFileResource tsfile resource to be loaded
* @param filePartitionId the partition id of the new file
- * @UsedBy sync module, load external tsfile module.
* @return load the file successfully
* @UsedBy sync module, load external tsfile module.
+ * @UsedBy sync module, load external tsfile module.
*/
private boolean loadTsFileByType(LoadTsFileType type, File syncedTsFile,
TsFileResource tsFileResource, long filePartitionId)
@@ -1830,7 +1833,8 @@ public class StorageGroupProcessor {
switch (type) {
case LOAD_UNSEQUENCE:
targetFile = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
- storageGroupName + File.separatorChar + filePartitionId + File.separator + tsFileResource
+ storageGroupName + File.separatorChar + filePartitionId + File.separator
+ + tsFileResource
.getFile().getName());
tsFileResource.setFile(targetFile);
if (unSequenceFileList.contains(tsFileResource)) {
@@ -1890,7 +1894,8 @@ public class StorageGroupProcessor {
}
partitionDirectFileVersions.computeIfAbsent(filePartitionId,
p -> new HashSet<>()).addAll(tsFileResource.getHistoricalVersions());
- updatePartitionFileVersion(filePartitionId, Collections.max(tsFileResource.getHistoricalVersions()));
+ updatePartitionFileVersion(filePartitionId,
+ Collections.max(tsFileResource.getHistoricalVersions()));
return true;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
index f217a79..ef727ee 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/control/FileReaderManager.java
@@ -37,8 +37,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
- * FileReaderManager is a singleton, which is used to manage
- * all file readers(opened file streams) to ensure that each file is opened at most once.
+ * FileReaderManager is a singleton, which is used to manage all file readers(opened file streams)
+ * to ensure that each file is opened at most once.
*/
public class FileReaderManager implements IService {
@@ -51,24 +51,24 @@ public class FileReaderManager implements IService {
private static final int MAX_CACHED_FILE_SIZE = 30000;
/**
- * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
- * is the corresponding reader.
+ * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
+ * corresponding reader.
*/
private Map<String, TsFileSequenceReader> closedFileReaderMap;
/**
- * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
- * is the corresponding reader.
+ * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
+ * corresponding reader.
*/
private Map<String, TsFileSequenceReader> unclosedFileReaderMap;
/**
- * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap
- * is the file's reference count.
+ * the key of closedFileReaderMap is the file path and the value of closedFileReaderMap is the
+ * file's reference count.
*/
private Map<String, AtomicInteger> closedReferenceMap;
/**
- * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap
- * is the file's reference count.
+ * the key of unclosedFileReaderMap is the file path and the value of unclosedFileReaderMap is the
+ * file's reference count.
*/
private Map<String, AtomicInteger> unclosedReferenceMap;
@@ -131,7 +131,8 @@ public class FileReaderManager implements IService {
iterator.remove();
refMap.remove(entry.getKey());
if (resourceLogger.isDebugEnabled()) {
- resourceLogger.debug("{} TsFileReader is closed because of no reference.", entry.getKey());
+ resourceLogger
+ .debug("{} TsFileReader is closed because of no reference.", entry.getKey());
}
}
}
@@ -174,12 +175,17 @@ public class FileReaderManager implements IService {
* of a reader equals zero, the reader can be closed and removed.
*/
void increaseFileReaderReference(TsFileResource tsFile, boolean isClosed) {
+ logger.warn("Apply read lock");
tsFile.getWriteQueryLock().readLock().lock();
+ logger.warn("Apply read lock to {}, current read lock num: {}", tsFile.getFile().toString(),
+ tsFile.getWriteQueryLock().getReadLockCount());
synchronized (this) {
if (!isClosed) {
- unclosedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger()).getAndIncrement();
+ unclosedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger())
+ .getAndIncrement();
} else {
- closedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger()).getAndIncrement();
+ closedReferenceMap.computeIfAbsent(tsFile.getPath(), k -> new AtomicInteger())
+ .getAndIncrement();
}
}
}
@@ -192,11 +198,14 @@ public class FileReaderManager implements IService {
synchronized (this) {
if (!isClosed && unclosedReferenceMap.containsKey(tsFile.getPath())) {
unclosedReferenceMap.get(tsFile.getPath()).decrementAndGet();
- } else if (closedReferenceMap.containsKey(tsFile.getPath())){
+ } else if (closedReferenceMap.containsKey(tsFile.getPath())) {
closedReferenceMap.get(tsFile.getPath()).decrementAndGet();
}
}
+ logger.warn("release read lock");
tsFile.getWriteQueryLock().readLock().unlock();
+ logger.warn("release read lock: {}, current read lock num: {}", tsFile.getFile().toString(),
+ tsFile.getWriteQueryLock().getReadLockCount());
}
/**
@@ -204,7 +213,8 @@ public class FileReaderManager implements IService {
* integration tests will not conflict with each other.
*/
public synchronized void closeAndRemoveAllOpenedReaders() throws IOException {
- Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet().iterator();
+ Iterator<Map.Entry<String, TsFileSequenceReader>> iterator = closedFileReaderMap.entrySet()
+ .iterator();
while (iterator.hasNext()) {
Map.Entry<String, TsFileSequenceReader> entry = iterator.next();
entry.getValue().close();