You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/03 12:00:55 UTC
[incubator-iotdb] 03/03: finish file renaming for loaded sequence
tsfiles
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 10688c03206b6ee766fb982711710a8f89f341fe
Author: lta <li...@163.com>
AuthorDate: Tue Sep 3 20:00:36 2019 +0800
finish file renaming for loaded sequence tsfiles
---
.../org/apache/iotdb/db/conf/IoTDBConstant.java | 1 +
.../iotdb/db/engine/merge/task/MergeFileTask.java | 7 +-
.../engine/storagegroup/StorageGroupProcessor.java | 113 ++++++++++++++-------
.../iotdb/db/sync/receiver/load/FileLoader.java | 45 +++++++-
.../db/sync/receiver/load/FileLoaderTest.java | 95 +++++++++--------
.../recover/SyncReceiverLogAnalyzerTest.java | 14 +--
6 files changed, 183 insertions(+), 92 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
index c29472b..0b6f0cc 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConstant.java
@@ -68,5 +68,6 @@ public class IoTDBConstant {
// data folder name
public static final String SEQUENCE_FLODER_NAME = "sequence";
public static final String UNSEQUENCE_FLODER_NAME = "unsequence";
+ public static final String FILE_NAME_SEPARATOR = "-";
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 3e5dc84..32050d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
import org.apache.iotdb.db.engine.merge.manage.MergeContext;
@@ -232,10 +233,12 @@ class MergeFileTask {
}
private File getNextMergeVersionFile(File seqFile) {
- String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "").split("-");
+ String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "")
+ .split(IoTDBConstant.FILE_NAME_SEPARATOR);
int mergeVersion = Integer.parseInt(splits[2]) + 1;
return new File(seqFile.getParentFile(),
- splits[0] + "-" + splits[1] + "-" + mergeVersion + TSFILE_SUFFIX);
+ splits[0] + IoTDBConstant.FILE_NAME_SEPARATOR + splits[1]
+ + IoTDBConstant.FILE_NAME_SEPARATOR + mergeVersion + TSFILE_SUFFIX);
}
private long writeUnmergedChunks(List<Long> chunkStartTimes,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 3555e3d..f61f4e6 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -301,9 +301,11 @@ public class StorageGroupProcessor {
// TsFileNameComparator compares TsFiles by the version number in its name
// ({systemTime}-{versionNum}-{mergeNum}.tsfile)
- public int compareFileName(File o1, File o2) {
- String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split("-");
- String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split("-");
+ private int compareFileName(File o1, File o2) {
+ String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "")
+ .split(IoTDBConstant.FILE_NAME_SEPARATOR);
+ String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "")
+ .split(IoTDBConstant.FILE_NAME_SEPARATOR);
if (Long.valueOf(items1[0]) - Long.valueOf(items2[0]) == 0) {
return Long.compare(Long.valueOf(items1[1]), Long.valueOf(items2[1]));
} else {
@@ -495,8 +497,8 @@ public class StorageGroupProcessor {
new File(baseDir, storageGroupName).mkdirs();
String filePath = Paths.get(baseDir, storageGroupName,
- System.currentTimeMillis() + "-" + versionController.nextVersion()).toString() + "-0"
- + TSFILE_SUFFIX;
+ System.currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + versionController
+ .nextVersion()).toString() + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
if (sequence) {
return new TsFileProcessor(storageGroupName, new File(filePath),
@@ -982,6 +984,9 @@ public class StorageGroupProcessor {
// check new tsfile
outer:
for (int i = 0; i < sequenceFileList.size(); i++) {
+ if (sequenceFileList.get(i).getFile().getName().equals(newTsFile.getName())) {
+ return;
+ }
if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
continue;
}
@@ -1032,7 +1037,7 @@ public class StorageGroupProcessor {
// update latest time map
updateLatestTimeMap(newTsFileResource);
- } catch (TsFileProcessorException e) {
+ } catch (TsFileProcessorException | DiskSpaceInsufficientException e) {
logger.error("Failed to append the tsfile {} to storage group processor {}.",
newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName());
throw new TsFileProcessorException(e);
@@ -1072,54 +1077,87 @@ public class StorageGroupProcessor {
* @UsedBy sync module
*/
private void loadTsFileByType(int type, File tsFile, TsFileResource tsFileResource, int index)
- throws TsFileProcessorException {
+ throws TsFileProcessorException, DiskSpaceInsufficientException {
File targetFile;
if (type == -1) {
targetFile =
- new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
- .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME
- + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
- + tsFile.getName());
+ new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+ tsFile.getParentFile().getName() + File.separatorChar + tsFile.getName());
tsFileResource.setFile(targetFile);
unSequenceFileList.add(index, tsFileResource);
+ logger
+ .info("Load tsfile in unsequence list, move file from {} to {}", tsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
} else {
targetFile =
- new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
- .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
- + tsFile.getName());
+ new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ tsFile.getParentFile().getName() + File.separatorChar + getFileNameForLoadingFile(
+ tsFile.getName(), index));
tsFileResource.setFile(targetFile);
sequenceFileList.add(index, tsFileResource);
+ logger.info("Load tsfile in sequence list, move file from {} to {}", tsFile.getAbsolutePath(),
+ targetFile.getAbsolutePath());
}
// move file from sync dir to data dir
if (!targetFile.getParentFile().exists()) {
targetFile.getParentFile().mkdirs();
}
- if (!new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
- targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) {
- throw new TsFileProcessorException(
- String
- .format("The new .resource file {%s} to be loaded does not exist.",
- tsFile.getAbsolutePath()));
+ if (tsFile.exists() && !targetFile.exists()) {
+ try {
+ FileUtils.moveFile(tsFile, targetFile);
+ } catch (IOException e) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading tsfile. Origin: %s, Target: %s",
+ tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+ }
}
- if (!new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()
- && !new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)
- .renameTo(new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX))) {
- throw new TsFileProcessorException(String.format(
- "File renaming failed when loading .resource file. Origin: %s, Target: %s",
- new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
- new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath()));
+ if (new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+ targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) {
+ try {
+ FileUtils.moveFile(new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX),
+ new File(targetFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
+ } catch (IOException e) {
+ throw new TsFileProcessorException(String.format(
+ "File renaming failed when loading .resource file. Origin: %s, Target: %s",
+ new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
+ new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath()));
+ }
}
- if (!tsFile.exists() && !targetFile.exists()) {
- throw new TsFileProcessorException(String
- .format("The new tsfile {%s} to be loaded does not exist.",
- tsFile.getAbsolutePath()));
+ }
+
+ /**
+ * Get an appropriate filename to ensure the order between files
+ *
+ * @param tsfileName origin tsfile name
+ * @param index the index to be inserted
+ * @return appropriate filename
+ */
+ private String getFileNameForLoadingFile(String tsfileName, int index) {
+ long currentTsFileTime = Long.parseLong(tsfileName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]);
+ long preTime;
+ if (index == 0) {
+ preTime = 0L;
+ } else {
+ String preName = sequenceFileList.get(index - 1).getFile().getName();
+ preTime = Long.parseLong(preName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]);
}
- if (!targetFile.exists() && !tsFile.renameTo(targetFile)) {
- throw new TsFileProcessorException(String.format(
- "File renaming failed when loading tsfile. Origin: %s, Target: %s",
- tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+ if (index == sequenceFileList.size()) {
+ return preTime < currentTsFileTime ? tsfileName
+ : System.currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + versionController
+ .nextVersion() + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ } else {
+ String subsequenceName = sequenceFileList.get(index).getFile().getName();
+ long subsequenceTime = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[0]);
+ long subsequenceVersion = Long
+ .parseLong(subsequenceName.split(IoTDBConstant.FILE_NAME_SEPARATOR)[1]);
+ if (preTime < currentTsFileTime && currentTsFileTime < subsequenceTime) {
+ return tsfileName;
+ } else {
+ return (preTime + ((subsequenceTime - preTime) >> 1)) + IoTDBConstant.FILE_NAME_SEPARATOR
+ + subsequenceVersion + IoTDBConstant.FILE_NAME_SEPARATOR + "0" + TSFILE_SUFFIX;
+ }
}
}
@@ -1168,7 +1206,8 @@ public class StorageGroupProcessor {
deletedTsFileResource.getMergeQueryLock().writeLock().lock();
try {
deletedTsFileResource.getFile().delete();
- new File(deletedTsFileResource.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete();
+ new File(deletedTsFileResource.getFile().getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX)
+ .delete();
} finally {
deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 2fa47d9..6d724ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -16,6 +16,8 @@ package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -25,6 +27,12 @@ import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.sync.sender.conf.SyncConstant;
+import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadata;
+import org.apache.iotdb.tsfile.file.metadata.TsDeviceMetadataIndex;
+import org.apache.iotdb.tsfile.file.metadata.TsFileMetaData;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +83,7 @@ public class FileLoader implements IFileLoader {
if (loadTask != null) {
try {
handleLoadTask(loadTask);
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.error("Can not load task {}", loadTask, e);
}
}
@@ -121,8 +129,12 @@ public class FileLoader implements IFileLoader {
loadLog.startLoadTsFiles();
curType = LoadType.ADD;
}
+ if (!newTsFile.exists()) {
+ LOGGER.info("Tsfile {} doesn't exist.", newTsFile.getAbsolutePath());
+ return;
+ }
TsFileResource tsFileResource = new TsFileResource(new File(newTsFile.getAbsolutePath()));
- tsFileResource.deSerialize();
+ checkTsFileResource(tsFileResource);
try {
StorageEngine.getInstance().loadNewTsFile(newTsFile, tsFileResource);
} catch (TsFileProcessorException | StorageEngineException e) {
@@ -132,6 +144,35 @@ public class FileLoader implements IFileLoader {
loadLog.finishLoadDeletedFile(newTsFile);
}
+ private void checkTsFileResource(TsFileResource tsFileResource) throws IOException {
+ if (!tsFileResource.fileExists()) {
+ // .resource file does not exist, read file metadata and recover tsfile resource
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(
+ tsFileResource.getFile().getAbsolutePath())) {
+ TsFileMetaData metaData = reader.readFileMetadata();
+ List<TsDeviceMetadataIndex> deviceMetadataIndexList = new ArrayList<>(
+ metaData.getDeviceMap().values());
+ for (TsDeviceMetadataIndex index : deviceMetadataIndexList) {
+ TsDeviceMetadata deviceMetadata = reader.readTsDeviceMetaData(index);
+ List<ChunkGroupMetaData> chunkGroupMetaDataList = deviceMetadata
+ .getChunkGroupMetaDataList();
+ for (ChunkGroupMetaData chunkGroupMetaData : chunkGroupMetaDataList) {
+ for (ChunkMetaData chunkMetaData : chunkGroupMetaData.getChunkMetaDataList()) {
+ tsFileResource.updateStartTime(chunkGroupMetaData.getDeviceID(),
+ chunkMetaData.getStartTime());
+ tsFileResource
+ .updateEndTime(chunkGroupMetaData.getDeviceID(), chunkMetaData.getEndTime());
+ }
+ }
+ }
+ }
+ // write .resource file
+ tsFileResource.serialize();
+ } else {
+ tsFileResource.deSerialize();
+ }
+ }
+
private void loadDeletedFile(File deletedTsFile) throws IOException {
if (curType != LoadType.DELETE) {
loadLog.startLoadDeletedFiles();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
index 4f31c57..637483f 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderTest.java
@@ -20,8 +20,10 @@ package org.apache.iotdb.db.sync.receiver.load;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
@@ -83,25 +85,28 @@ public class FileLoaderTest {
@Test
public void loadNewTsfiles() throws IOException, StorageEngineException {
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
- Map<String, Set<File>> allFileList = new HashMap<>();
- Map<String, Set<File>> correctSequenceLoadedFileMap = new HashMap<>();
+ Map<String, List<File>> allFileList = new HashMap<>();
+ Map<String, Set<String>> correctSequenceLoadedFileMap = new HashMap<>();
// add some new tsfiles
Random r = new Random(0);
+ long time = System.currentTimeMillis();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 10; j++) {
- allFileList.putIfAbsent(SG_NAME + i, new HashSet<>());
+ allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>());
correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile";
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100
+ + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
File syncFile = new File(fileName);
File dataFile = new File(
syncFile.getParentFile().getParentFile().getParentFile().getParentFile()
.getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
+ File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar
+ syncFile.getName());
- correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile);
+ correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath());
allFileList.get(SG_NAME + i).add(syncFile);
if (!syncFile.getParentFile().exists()) {
syncFile.getParentFile().mkdirs();
@@ -128,7 +133,7 @@ public class FileLoaderTest {
}
assert getReceiverFolderFile().exists();
- for (Set<File> set : allFileList.values()) {
+ for (List<File> set : allFileList.values()) {
for (File newTsFile : set) {
if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
fileLoader.addTsfile(newTsFile);
@@ -150,19 +155,19 @@ public class FileLoaderTest {
}
assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists();
- Map<String, Set<File>> sequenceLoadedFileMap = new HashMap<>();
+ Map<String, Set<String>> sequenceLoadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i);
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assert processor.getSequenceFileList().size() == 10;
for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
- sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile());
+ sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath());
}
assert processor.getUnSequenceFileList().isEmpty();
}
assert sequenceLoadedFileMap.size() == correctSequenceLoadedFileMap.size();
- for (Entry<String, Set<File>> entry : correctSequenceLoadedFileMap.entrySet()) {
+ for (Entry<String, Set<String>> entry : correctSequenceLoadedFileMap.entrySet()) {
String sg = entry.getKey();
assert entry.getValue().size() == sequenceLoadedFileMap.get(sg).size();
assert entry.getValue().containsAll(sequenceLoadedFileMap.get(sg));
@@ -171,23 +176,26 @@ public class FileLoaderTest {
// add some overlap new tsfiles
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
- Map<String, Set<File>> correctUnSequenceLoadedFileMap = new HashMap<>();
+ Map<String, Set<String>> correctUnSequenceLoadedFileMap = new HashMap<>();
allFileList = new HashMap<>();
r = new Random(1);
+ time = System.currentTimeMillis();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 10; j++) {
- allFileList.putIfAbsent(SG_NAME + i, new HashSet<>());
+ allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>());
correctUnSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile";
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100
+ + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
File syncFile = new File(fileName);
File dataFile = new File(
syncFile.getParentFile().getParentFile().getParentFile().getParentFile()
.getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME
+ File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar
+ syncFile.getName());
- correctUnSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile);
+ correctUnSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath());
allFileList.get(SG_NAME + i).add(syncFile);
if (!syncFile.getParentFile().exists()) {
syncFile.getParentFile().mkdirs();
@@ -214,7 +222,7 @@ public class FileLoaderTest {
}
assert getReceiverFolderFile().exists();
- for (Set<File> set : allFileList.values()) {
+ for (List<File> set : allFileList.values()) {
for (File newTsFile : set) {
if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
fileLoader.addTsfile(newTsFile);
@@ -242,30 +250,30 @@ public class FileLoaderTest {
sequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assert processor.getSequenceFileList().size() == 10;
for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
- sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile());
+ sequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath());
}
assert !processor.getUnSequenceFileList().isEmpty();
}
assert sequenceLoadedFileMap.size() == correctSequenceLoadedFileMap.size();
- for (Entry<String, Set<File>> entry : correctSequenceLoadedFileMap.entrySet()) {
+ for (Entry<String, Set<String>> entry : correctSequenceLoadedFileMap.entrySet()) {
String sg = entry.getKey();
assert entry.getValue().size() == sequenceLoadedFileMap.get(sg).size();
assert entry.getValue().containsAll(sequenceLoadedFileMap.get(sg));
}
- Map<String, Set<File>> unsequenceLoadedFileMap = new HashMap<>();
+ Map<String, Set<String>> unsequenceLoadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i);
unsequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assert processor.getUnSequenceFileList().size() == 10;
for (TsFileResource tsFileResource : processor.getUnSequenceFileList()) {
- unsequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile());
+ unsequenceLoadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath());
}
}
assert unsequenceLoadedFileMap.size() == correctUnSequenceLoadedFileMap.size();
- for (Entry<String, Set<File>> entry : correctUnSequenceLoadedFileMap.entrySet()) {
+ for (Entry<String, Set<String>> entry : correctUnSequenceLoadedFileMap.entrySet()) {
String sg = entry.getKey();
assert entry.getValue().size() == unsequenceLoadedFileMap.get(sg).size();
assert entry.getValue().containsAll(unsequenceLoadedFileMap.get(sg));
@@ -273,27 +281,28 @@ public class FileLoaderTest {
}
@Test
- public void loadDeletedFileName() throws IOException, StorageEngineException {
+ public void loadDeletedFileName() throws IOException, StorageEngineException, InterruptedException {
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
- Map<String, Set<File>> allFileList = new HashMap<>();
- Map<String, Set<File>> correctLoadedFileMap = new HashMap<>();
+ Map<String, List<File>> allFileList = new HashMap<>();
+ Map<String, Set<String>> correctLoadedFileMap = new HashMap<>();
// add some tsfiles
Random r = new Random(0);
+ long time = System.currentTimeMillis();
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 25; j++) {
- allFileList.putIfAbsent(SG_NAME + i, new HashSet<>());
+ allFileList.putIfAbsent(SG_NAME + i, new ArrayList<>());
correctLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile";
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + (time + i * 100
+ + j) + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
File syncFile = new File(fileName);
File dataFile = new File(
- syncFile.getParentFile().getParentFile().getParentFile().getParentFile()
- .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar
- + syncFile.getName());
- correctLoadedFileMap.get(SG_NAME + i).add(dataFile);
+ DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ syncFile.getParentFile().getName() + File.separatorChar + syncFile.getName());
+ correctLoadedFileMap.get(SG_NAME + i).add(dataFile.getAbsolutePath());
allFileList.get(SG_NAME + i).add(syncFile);
if (!syncFile.getParentFile().exists()) {
syncFile.getParentFile().mkdirs();
@@ -318,7 +327,7 @@ public class FileLoaderTest {
}
assert getReceiverFolderFile().exists();
- for (Set<File> set : allFileList.values()) {
+ for (List<File> set : allFileList.values()) {
for (File newTsFile : set) {
if (!newTsFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
fileLoader.addTsfile(newTsFile);
@@ -340,19 +349,19 @@ public class FileLoaderTest {
}
assert !new File(getReceiverFolderFile(), SyncConstant.RECEIVER_DATA_FOLDER_NAME).exists();
- Map<String, Set<File>> loadedFileMap = new HashMap<>();
+ Map<String, Set<String>> loadedFileMap = new HashMap<>();
for (int i = 0; i < 3; i++) {
StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i);
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
assert processor.getSequenceFileList().size() == 25;
for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
- loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile());
+ loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath());
}
assert processor.getUnSequenceFileList().isEmpty();
}
assert loadedFileMap.size() == correctLoadedFileMap.size();
- for (Entry<String, Set<File>> entry : correctLoadedFileMap.entrySet()) {
+ for (Entry<String, Set<String>> entry : correctLoadedFileMap.entrySet()) {
String sg = entry.getKey();
assert entry.getValue().size() == loadedFileMap.get(sg).size();
assert entry.getValue().containsAll(loadedFileMap.get(sg));
@@ -360,18 +369,16 @@ public class FileLoaderTest {
// delete some tsfiles
fileLoader = FileLoader.createFileLoader(getReceiverFolderFile());
- for(Entry<String, Set<File>> entry:allFileList.entrySet()){
+ for(Entry<String, List<File>> entry:allFileList.entrySet()){
String sg = entry.getKey();
- Set<File> files = entry.getValue();
+ List<File> files = entry.getValue();
int cnt = 0;
for(File snapFile:files){
- if(!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)){
+ if (!snapFile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
File dataFile = new File(
- snapFile.getParentFile().getParentFile().getParentFile().getParentFile()
- .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separatorChar + snapFile.getParentFile().getName() + File.separatorChar
- + snapFile.getName());
- correctLoadedFileMap.get(sg).remove(dataFile);
+ DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ snapFile.getParentFile().getName() + File.separatorChar + snapFile.getName());
+ correctLoadedFileMap.get(sg).remove(dataFile.getAbsolutePath());
snapFile.delete();
fileLoader.addDeletedFileName(snapFile);
new File(snapFile + TsFileResource.RESOURCE_SUFFIX).delete();
@@ -395,18 +402,18 @@ public class FileLoaderTest {
LOGGER.error("Fail to wait for loading new tsfiles", e);
}
- loadedFileMap = new HashMap<>();
+ loadedFileMap.clear();
for (int i = 0; i < 3; i++) {
StorageGroupProcessor processor = StorageEngine.getInstance().getProcessor(SG_NAME + i);
loadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
for (TsFileResource tsFileResource : processor.getSequenceFileList()) {
- loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile());
+ loadedFileMap.get(SG_NAME + i).add(tsFileResource.getFile().getAbsolutePath());
}
assert processor.getUnSequenceFileList().isEmpty();
}
assert loadedFileMap.size() == correctLoadedFileMap.size();
- for (Entry<String, Set<File>> entry : correctLoadedFileMap.entrySet()) {
+ for (Entry<String, Set<String>> entry : correctLoadedFileMap.entrySet()) {
String sg = entry.getKey();
assert entry.getValue().size() == loadedFileMap.get(sg).size();
assert entry.getValue().containsAll(loadedFileMap.get(sg));
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
index f2de4d0..4743059 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzerTest.java
@@ -105,16 +105,17 @@ public class SyncReceiverLogAnalyzerTest {
correctSequenceLoadedFileMap.putIfAbsent(SG_NAME + i, new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
- getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + rand + ".tsfile";
+ getSnapshotFolder() + File.separator + SG_NAME + i + File.separator + System
+ .currentTimeMillis() + IoTDBConstant.FILE_NAME_SEPARATOR + rand
+ + IoTDBConstant.FILE_NAME_SEPARATOR + "0.tsfile";
File syncFile = new File(fileName);
receiverLogger
.finishSyncTsfile(syncFile);
toBeSyncedFiles.add(syncFile.getAbsolutePath());
File dataFile = new File(
- syncFile.getParentFile().getParentFile().getParentFile().getParentFile()
- .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separatorChar + syncFile.getParentFile().getName() + File.separatorChar
- + syncFile.getName());
+ DirectoryManager.getInstance().getNextFolderForSequenceFile(),
+ syncFile.getParentFile().getName() + File.separatorChar
+ + syncFile.getName());
correctSequenceLoadedFileMap.get(SG_NAME + i).add(dataFile);
allFileList.get(SG_NAME + i).add(syncFile);
if (!syncFile.getParentFile().exists()) {
@@ -154,13 +155,12 @@ public class SyncReceiverLogAnalyzerTest {
assert new File(getReceiverFolderFile(), SyncConstant.LOAD_LOG_NAME).exists();
assert new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME).exists();
assert FileLoaderManager.getInstance().containsFileLoader(getReceiverFolderFile().getName());
- int count = 0, mode = 0;
+ int mode = 0;
Set<String> toBeSyncedFilesTest = new HashSet<>();
try (BufferedReader br = new BufferedReader(
new FileReader(new File(getReceiverFolderFile(), SyncConstant.SYNC_LOG_NAME)))) {
String line;
while ((line = br.readLine()) != null) {
- count++;
if (line.equals(SyncReceiverLogger.SYNC_DELETED_FILE_NAME_START)) {
mode = -1;
} else if (line.equals(SyncReceiverLogger.SYNC_TSFILE_START)) {