You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/03/31 12:01:43 UTC
[incubator-iotdb] branch fix_many_bugs updated: fix partition -1
when recover
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch fix_many_bugs
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/fix_many_bugs by this push:
new e7c387d fix partition -1 when recover
e7c387d is described below
commit e7c387d5ee4ba2d7afeec7be498a845f08d56882
Author: qiaojialin <64...@qq.com>
AuthorDate: Tue Mar 31 20:02:08 2020 +0800
fix partition -1 when recover
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 3 +-
.../engine/storagegroup/StorageGroupProcessor.java | 45 +++++++---------------
.../org/apache/iotdb/db/utils/FilePathUtils.java | 5 +++
.../writelog/recover/TsFileRecoverPerformer.java | 4 --
4 files changed, 20 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3ef7e0a..0e6641c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -498,8 +498,7 @@ public class StorageEngine implements IService {
if (!sequenceFile.isClosed()) {
continue;
}
- String[] fileSplits = FilePathUtils.splitTsFilePath(sequenceFile);
- long partitionNum = Long.parseLong(fileSplits[fileSplits.length - 2]);
+ long partitionNum = FilePathUtils.getTsFileResourcePartition(sequenceFile);
Map<Long, List<TsFileResource>> storageGroupFiles = ret.computeIfAbsent(entry.getKey()
, n -> new HashMap<>());
storageGroupFiles.computeIfAbsent(partitionNum, n -> new ArrayList<>()).add(sequenceFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 97c2be9..07718a4 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -262,8 +262,7 @@ public class StorageGroupProcessor {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = FilePathUtils.getTsFileResourcePartition(resource);
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
for (TsFileResource resource : unseqTsFiles) {
@@ -271,8 +270,7 @@ public class StorageGroupProcessor {
if (resource.getFile().length() == 0) {
deleteTsfile(resource.getFile());
}
- String[] filePathSplit = FilePathUtils.splitTsFilePath(resource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = FilePathUtils.getTsFileResourcePartition(resource);
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>()).addAll(resource.getHistoricalVersions());
}
@@ -296,30 +294,16 @@ public class StorageGroupProcessor {
}
for (TsFileResource resource : sequenceFileTreeSet) {
- long timePartitionId = getTimePartitionFromTsFileResource(resource);
- if (timePartitionId != -1) {
- latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
- .putAll(resource.getEndTimeMap());
- partitionLatestFlushedTimeForEachDevice
- .computeIfAbsent(timePartitionId, id -> new HashMap<>())
- .putAll(resource.getEndTimeMap());
- globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
- }
+ long timePartitionId = FilePathUtils.getTsFileResourcePartition(resource);
+ latestTimeForEachDevice.computeIfAbsent(timePartitionId, l -> new HashMap<>())
+ .putAll(resource.getEndTimeMap());
+ partitionLatestFlushedTimeForEachDevice
+ .computeIfAbsent(timePartitionId, id -> new HashMap<>())
+ .putAll(resource.getEndTimeMap());
+ globalLatestFlushedTimeForEachDevice.putAll(resource.getEndTimeMap());
}
}
- private long getTimePartitionFromTsFileResource(TsFileResource resource) {
- // device id -> start map
- // if start time map is empty, tsfile resource is empty, return -1;
- Map<String, Long> startTimeMap = resource.getStartTimeMap();
- // just find any time of device
- Iterator<Long> iterator = startTimeMap.values().iterator();
- if (iterator.hasNext()) {
- return StorageEngine.getTimePartition(iterator.next());
- }
-
- return -1;
- }
/**
* get version controller by time partition Id Thread-safety should be ensure by caller
@@ -393,7 +377,7 @@ public class StorageGroupProcessor {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
sequenceFileTreeSet.add(tsFileResource);
- long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long timePartitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, false,
@@ -422,7 +406,7 @@ public class StorageGroupProcessor {
for (int i = 0; i < tsFiles.size(); i++) {
TsFileResource tsFileResource = tsFiles.get(i);
unSequenceFileList.add(tsFileResource);
- long timePartitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long timePartitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
TsFileRecoverPerformer recoverPerformer = new TsFileRecoverPerformer(storageGroupName + "-",
getVersionControllerByTimePartitionId(timePartitionId), tsFileResource, true,
@@ -1219,7 +1203,7 @@ public class StorageGroupProcessor {
continue;
}
- long partitionId = getTimePartitionFromTsFileResource(tsFileResource);
+ long partitionId = FilePathUtils.getTsFileResourcePartition(tsFileResource);
deletion.setVersionNum(getVersionControllerByTimePartitionId(partitionId).nextVersion());
// write deletion into modification file
@@ -1566,7 +1550,7 @@ public class StorageGroupProcessor {
// check whether the file name needs to be renamed.
if (!sequenceFileTreeSet.isEmpty()) {
String newFileName = getFileNameForLoadingFile(tsfileToBeInserted.getName(), insertPos,
- getTimePartitionFromTsFileResource(newTsFileResource), sequenceList);
+ FilePathUtils.getTsFileResourcePartition(newTsFileResource), sequenceList);
if (!newFileName.equals(tsfileToBeInserted.getName())) {
logger.info("Tsfile {} must be renamed to {} for loading into the sequence list.",
tsfileToBeInserted.getName(), newFileName);
@@ -1579,8 +1563,7 @@ public class StorageGroupProcessor {
// update latest time map
updateLatestTimeMap(newTsFileResource);
- String[] filePathSplit = FilePathUtils.splitTsFilePath(newTsFileResource);
- long partitionNum = Long.parseLong(filePathSplit[filePathSplit.length - 2]);
+ long partitionNum = FilePathUtils.getTsFileResourcePartition(newTsFileResource);
partitionDirectFileVersions.computeIfAbsent(partitionNum, p -> new HashSet<>())
.addAll(newTsFileResource.getHistoricalVersions());
} catch (DiskSpaceInsufficientException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
index d89e6fb..b5168b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/FilePathUtils.java
@@ -45,4 +45,9 @@ public class FilePathUtils {
public static String[] splitTsFilePath(TsFileResource resource) {
return resource.getFile().getAbsolutePath().split(PATH_SPLIT_STRING);
}
+
+ public static long getTsFileResourcePartition(TsFileResource resource) {
+ String[] splits = splitTsFilePath(resource);
+ return Long.parseLong(splits[splits.length - 2]);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
index ad22e19..1e05b9e 100644
--- a/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/writelog/recover/TsFileRecoverPerformer.java
@@ -81,9 +81,6 @@ public class TsFileRecoverPerformer {
*/
public RestorableTsFileIOWriter recover() throws StorageGroupProcessorException {
- IMemTable recoverMemTable = new PrimitiveMemTable();
- this.logReplayer = new LogReplayer(logNodePrefix, insertFilePath, resource.getModFile(),
- versionController, resource, recoverMemTable, acceptUnseq);
File insertFile = FSFactoryProducer.getFSFactory().getFile(insertFilePath);
if (!insertFile.exists()) {
logger.error("TsFile {} is missing, will skip its recovery.", insertFilePath);
@@ -197,7 +194,6 @@ public class TsFileRecoverPerformer {
try {
if (!recoverMemTable.isEmpty()) {
// flush logs
-
MemTableFlushTask tableFlushTask = new MemTableFlushTask(recoverMemTable,
restorableTsFileIOWriter, resource.getFile().getParentFile().getParentFile().getName());
tableFlushTask.syncFlushMemTable();