You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/01/26 06:46:44 UTC
[iotdb] 01/01: fix sync bug for tsfiles's directory changed by
vitural storage group
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch fix_sync_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit de8f140afd35a2fe5ef22a1331ed4a819b8b5d9f
Author: chaow <xu...@gmail.com>
AuthorDate: Tue Jan 26 14:45:50 2021 +0800
fix sync bug for tsfiles's directory changed by vitural storage group
---
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../db/sync/sender/manage/ISyncFileManager.java | 11 +-
.../db/sync/sender/manage/SyncFileManager.java | 137 +++++++++++++--------
.../iotdb/db/sync/sender/transfer/ISyncClient.java | 3 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 47 ++++---
.../java/org/apache/iotdb/db/utils/SyncUtils.java | 12 +-
.../db/sync/sender/manage/SyncFileManagerTest.java | 116 +++++++++--------
.../sender/recover/SyncSenderLogAnalyzerTest.java | 51 +++++---
8 files changed, 228 insertions(+), 153 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1170733..e25b5a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -773,7 +773,7 @@ public class StorageEngine implements IService {
public void loadNewTsFileForSync(TsFileResource newTsFileResource)
throws StorageEngineException, LoadFileException, IllegalPathException {
- getProcessorDirectly(new PartialPath(getSgByEngineFile(newTsFileResource.getTsFile())))
+ getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
.loadNewTsFileForSync(newTsFileResource);
}
@@ -791,7 +791,7 @@ public class StorageEngine implements IService {
public boolean deleteTsfileForSync(File deletedTsfile)
throws StorageEngineException, IllegalPathException {
- return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
+ return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName()))
.deleteTsfile(deletedTsfile);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index 9fd9371..bfd9d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -55,13 +55,14 @@ public interface ISyncFileManager {
*/
void getValidFiles(String dataDir) throws IOException;
- Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap();
- Map<String, Map<Long, Set<File>>> getLastLocalFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap();
- Map<String, Map<Long, Set<File>>> getDeletedFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap();
- Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap();
- Map<String, Set<Long>> getAllSGs();
+ Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap();
+
+ Map<String, Map<Long, Set<Long>>> getAllSGs();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 0a41280..163daea 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -47,33 +47,35 @@ public class SyncFileManager implements ISyncFileManager {
/**
* All storage groups on the disk where the current sync task is executed
+ * logicalSg -> <virtualSg, timeRangeId>
*/
- private Map<String, Set<Long>> allSGs;
+ private Map<String, Map<Long, Set<Long>>> allSGs;
/**
* Key is storage group, value is all sealed tsfiles in the storage group. Inner key is time range
* id, inner value is the set of current sealed tsfiles.
+ * logicalSg -> virtualSg -> <timeRangeId, files>
*/
- private Map<String, Map<Long, Set<File>>> currentSealedLocalFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> currentSealedLocalFilesMap;
/**
* Key is storage group, value is all last local tsfiles in the storage group, which doesn't
* contains those tsfiles which are not synced successfully. Inner key is time range id, inner
* value is the set of last local tsfiles.
*/
- private Map<String, Map<Long, Set<File>>> lastLocalFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap;
/**
* Key is storage group, value is all deleted tsfiles which need to be synced to receiver end in
* the storage group. Inner key is time range id, inner value is the valid set of sealed tsfiles.
*/
- private Map<String, Map<Long, Set<File>>> deletedFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap;
/**
* Key is storage group, value is all new tsfiles which need to be synced to receiver end in the
* storage group. Inner key is time range id, inner value is the valid set of new tsfiles.
*/
- private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap;
private SyncFileManager() {
IoTDB.metaManager.init();
@@ -90,7 +92,7 @@ public class SyncFileManager implements ISyncFileManager {
currentSealedLocalFilesMap = new HashMap<>();
// get all files in data dir sequence folder
- Map<String, Map<Long, Set<File>>> currentAllLocalFiles = new HashMap<>();
+ Map<String, Map<Long, Map<Long, Set<File>>>> currentAllLocalFiles = new HashMap<>();
if (!new File(dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME).exists()) {
return;
}
@@ -102,35 +104,49 @@ public class SyncFileManager implements ISyncFileManager {
.equals(TsFileConstant.TMP_SUFFIX)) {
continue;
}
- allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
+ allSGs.putIfAbsent(sgFolder.getName(), new HashMap<>());
currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
- for (File timeRangeFolder : sgFolder.listFiles()) {
- try {
- Long timeRangeId = Long.parseLong(timeRangeFolder.getName());
- currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(timeRangeId, new HashSet<>());
- File[] files = timeRangeFolder.listFiles();
- Arrays.stream(files)
- .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(timeRangeId)
- .add(new File(timeRangeFolder.getAbsolutePath(), file.getName())));
- } catch (Exception e) {
- LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e);
- }
+ for (File virtualSgFolder : sgFolder.listFiles()) {
+ try {
+ Long vgId = Long.parseLong(virtualSgFolder.getName());
+ allSGs.get(sgFolder.getName()).putIfAbsent(vgId, new HashSet<>());
+ currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(vgId, new HashMap<>());
+
+ for (File timeRangeFolder : virtualSgFolder.listFiles()) {
+ try {
+ Long timeRangeId = Long.parseLong(timeRangeFolder.getName());
+ currentAllLocalFiles.get(sgFolder.getName()).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+ File[] files = timeRangeFolder.listFiles();
+ Arrays.stream(files)
+ .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(vgId).get(timeRangeId)
+ .add(new File(timeRangeFolder.getAbsolutePath(), file.getName())));
+ } catch (Exception e) {
+ LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Invalid virtual storage group folder: {}", virtualSgFolder.getAbsolutePath(), e);
+ }
}
}
// get sealed tsfiles
- for (Entry<String, Map<Long, Set<File>>> entry : currentAllLocalFiles.entrySet()) {
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : currentAllLocalFiles.entrySet()) {
String sgName = entry.getKey();
currentSealedLocalFilesMap.putIfAbsent(sgName, new HashMap<>());
- for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
- Long timeRangeId = innerEntry.getKey();
- currentSealedLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
- for (File file : innerEntry.getValue()) {
- if (!file.getName().endsWith(TSFILE_SUFFIX)) {
- continue;
- }
- if (checkFileValidity(file)) {
- currentSealedLocalFilesMap.get(sgName).get(timeRangeId).add(file);
+ for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+ Long vgId = vgEntry.getKey();
+ currentSealedLocalFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+ for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+ Long timeRangeId = innerEntry.getKey();
+ currentSealedLocalFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+ for (File file : innerEntry.getValue()) {
+ if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+ continue;
+ }
+ if (checkFileValidity(file)) {
+ currentSealedLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(file);
+ }
}
}
}
@@ -157,10 +173,13 @@ public class SyncFileManager implements ISyncFileManager {
while ((filePath = reader.readLine()) != null) {
File file = new File(filePath);
Long timeRangeId = Long.parseLong(file.getParentFile().getName());
- String sgName = file.getParentFile().getParentFile().getName();
- allSGs.putIfAbsent(sgName, new HashSet<>());
+ Long vgId = Long.parseLong(file.getParentFile().getParentFile().getName());
+ String sgName = file.getParentFile().getParentFile().getParentFile().getName();
+ allSGs.putIfAbsent(sgName, new HashMap<>());
+ allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
lastLocalFilesMap.computeIfAbsent(sgName, k -> new HashMap<>())
- .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file);
+ .computeIfAbsent(vgId, k -> new HashMap<>())
+ .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file);
}
}
}
@@ -177,28 +196,44 @@ public class SyncFileManager implements ISyncFileManager {
for (String sgName : allSGs.keySet()) {
toBeSyncedFilesMap.putIfAbsent(sgName, new HashMap<>());
deletedFilesMap.putIfAbsent(sgName, new HashMap<>());
- for (Entry<Long, Set<File>> entry : currentSealedLocalFilesMap
- .getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
- Long timeRangeId = entry.getKey();
- toBeSyncedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
- allSGs.get(sgName).add(timeRangeId);
- for (File newFile : entry.getValue()) {
- if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+
+ for (Entry<Long, Map<Long, Set<File>>> entry : currentSealedLocalFilesMap
+ .getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
+ Long vgId = entry.getKey();
+ toBeSyncedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+ allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
+
+ for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
+ Long timeRangeId = innerEntry.getKey();
+ toBeSyncedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+ allSGs.get(sgName).get(vgId).add(timeRangeId);
+ for (File newFile : innerEntry.getValue()) {
+ if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+ .getOrDefault(vgId, Collections.emptyMap())
.getOrDefault(timeRangeId, Collections.emptySet()).contains(newFile)) {
- toBeSyncedFilesMap.get(sgName).get(timeRangeId).add(newFile);
+ toBeSyncedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(newFile);
+ }
}
}
}
- for (Entry<Long, Set<File>> entry : lastLocalFilesMap
+ for (Entry<Long, Map<Long, Set<File>>> entry : lastLocalFilesMap
.getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
- Long timeRangeId = entry.getKey();
- deletedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
- allSGs.get(sgName).add(timeRangeId);
- for (File oldFile : entry.getValue()) {
- if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+ Long vgId = entry.getKey();
+ deletedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+ allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
+
+ for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
+ Long timeRangeId = innerEntry.getKey();
+ deletedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+ allSGs.get(sgName).get(vgId).add(timeRangeId);
+
+ for (File oldFile : innerEntry.getValue()) {
+ if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+ .getOrDefault(vgId, Collections.emptyMap())
.getOrDefault(timeRangeId, Collections.emptySet()).contains(oldFile)) {
- deletedFilesMap.get(sgName).get(timeRangeId).add(oldFile);
+ deletedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(oldFile);
+ }
}
}
}
@@ -206,27 +241,27 @@ public class SyncFileManager implements ISyncFileManager {
}
@Override
- public Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap() {
+ public Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap() {
return currentSealedLocalFilesMap;
}
@Override
- public Map<String, Map<Long, Set<File>>> getLastLocalFilesMap() {
+ public Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap() {
return lastLocalFilesMap;
}
@Override
- public Map<String, Map<Long, Set<File>>> getDeletedFilesMap() {
+ public Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap() {
return deletedFilesMap;
}
@Override
- public Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap() {
+ public Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap() {
return toBeSyncedFilesMap;
}
@Override
- public Map<String, Set<Long>> getAllSGs() {
+ public Map<String, Map<Long, Set<Long>>> getAllSGs() {
return allSGs;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
index 15323bc..1d501ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
@@ -92,10 +92,11 @@ public interface ISyncClient {
* receiver.
*
* @param sgName storage group name
+ * @param vgId virtual group id
* @param timeRangeId id of time range
* @param toBeSyncFiles list of new tsfile names
*/
- void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles)
+ void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles)
throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException;
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index b88c0de..71ede28 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -105,13 +105,13 @@ public class SyncClient implements ISyncClient {
private SyncService.Client serviceClient;
- private Map<String, Set<Long>> allSG;
+ private Map<String, Map<Long, Set<Long>>> allSG;
- private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap;
- private Map<String, Map<Long, Set<File>>> deletedFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap;
- private Map<String, Map<Long, Set<File>>> lastLocalFilesMap;
+ private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap;
/**
* If true, sync is in execution.
@@ -454,7 +454,7 @@ public class SyncClient implements ISyncClient {
syncStatus = true;
List<String> storageGroups = config.getStorageGroupList();
- for (Entry<String, Set<Long>> entry : allSG.entrySet()) {
+ for (Entry<String, Map<Long, Set<Long>>> entry : allSG.entrySet()) {
String sgName = entry.getKey();
if (!storageGroups.isEmpty() && !storageGroups.contains(sgName)) {
continue;
@@ -470,17 +470,22 @@ public class SyncClient implements ISyncClient {
throw new SyncConnectionException("Unable to connect to receiver", e);
}
logger.info(
- "Sync process starts to transfer data of storage group {}, it has {} time ranges.",
+ "Sync process starts to transfer data of storage group {}, it has {} virtual storage group.",
sgName, entry.getValue().size());
try {
- for (Long timeRangeId : entry.getValue()) {
- lastLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
- syncDeletedFilesNameInOneGroup(sgName, timeRangeId,
+ for (Entry<Long, Set<Long>> vgEntry : entry.getValue().entrySet()) {
+ lastLocalFilesMap.get(sgName).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+ for (Long timeRangeId : vgEntry.getValue()) {
+ lastLocalFilesMap.get(sgName).get(vgEntry.getKey()).putIfAbsent(timeRangeId, new HashSet<>());
+ syncDeletedFilesNameInOneGroup(sgName, timeRangeId,
deletedFilesMap.getOrDefault(sgName, Collections.emptyMap())
- .getOrDefault(timeRangeId, Collections.emptySet()));
- syncDataFilesInOneGroup(sgName, timeRangeId,
+ .getOrDefault(vgEntry.getKey(), Collections.emptyMap())
+ .getOrDefault(timeRangeId, Collections.emptySet()));
+ syncDataFilesInOneGroup(sgName, vgEntry.getKey(), timeRangeId,
toBeSyncedFilesMap.getOrDefault(sgName, Collections.emptyMap())
- .getOrDefault(timeRangeId, Collections.emptySet()));
+ .getOrDefault(vgEntry.getKey(), Collections.emptyMap())
+ .getOrDefault(timeRangeId, Collections.emptySet()));
+ }
}
} catch (SyncDeviceOwnerConflictException e) {
deletedFilesMap.remove(sgName);
@@ -528,7 +533,7 @@ public class SyncClient implements ISyncClient {
}
@Override
- public void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles)
+ public void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles)
throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException {
if (toBeSyncFiles.isEmpty()) {
logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
@@ -544,7 +549,7 @@ public class SyncClient implements ISyncClient {
// firstly sync .resource file, then sync tsfile
syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
syncSingleFile(snapshotFile);
- lastLocalFilesMap.get(sgName).get(timeRangeId).add(tsfile);
+ lastLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(tsfile);
syncLog.finishSyncTsfile(tsfile);
logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
} catch (IOException e) {
@@ -638,13 +643,15 @@ public class SyncClient implements ISyncClient {
// 1. Write file list to currentLocalFile
try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
- for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
- for (Set<File> files : currentLocalFiles.values()) {
- for (File file : files) {
- bw.write(file.getAbsolutePath());
- bw.newLine();
+ for (Map<Long, Map<Long, Set<File>>> vgCurrentLocalFiles : lastLocalFilesMap.values()) {
+ for (Map<Long, Set<File>> currentLocalFiles : vgCurrentLocalFiles.values()) {
+ for (Set<File> files : currentLocalFiles.values()) {
+ for (File file : files) {
+ bw.write(file.getAbsolutePath());
+ bw.newLine();
+ }
+ bw.flush();
}
- bw.flush();
}
}
} catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 2401a01..e5446a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -49,11 +49,13 @@ public class SyncUtils {
/**
* Verify sending list is empty or not It's used by sync sender.
*/
- public static boolean isEmpty(Map<String, Map<Long, Set<File>>> sendingFileList) {
- for (Entry<String, Map<Long, Set<File>>> entry: sendingFileList.entrySet()) {
- for(Entry<Long, Set<File>> innerEntry: entry.getValue().entrySet()) {
- if (!innerEntry.getValue().isEmpty()) {
- return false;
+ public static boolean isEmpty(Map<String, Map<Long, Map<Long, Set<File>>>> sendingFileList) {
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry: sendingFileList.entrySet()) {
+ for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+ for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+ if (!innerEntry.getValue().isEmpty()) {
+ return false;
+ }
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
index e742d57..c82f346 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
@@ -71,7 +71,7 @@ public class SyncFileManagerTest {
@Test
public void testGetValidFiles() throws IOException, MetadataException {
- Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>();
+ Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>();
Random r = new Random(0);
for (int i = 0; i < 3; i++) {
@@ -80,12 +80,13 @@ public class SyncFileManagerTest {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashMap<>())
.computeIfAbsent(0L, k -> new HashSet<>());
String rand = r.nextInt(10000) + TSFILE_SUFFIX;
String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+ + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand;
File file = new File(fileName);
- allFileList.get(getSgName(i)).get(0L).add(file);
+ allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
@@ -106,25 +107,27 @@ public class SyncFileManagerTest {
updateLastLocalFiles(allFileList);
manager.getValidFiles(dataDir);
- Map<String, Map<Long, Set<File>>> lastFileMap = manager.getLastLocalFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> lastFileMap = manager.getLastLocalFilesMap();
assertFileMap(allFileList, lastFileMap);
// add some files
- Map<String, Map<Long, Set<File>>> correctToBeSyncedFiles = new HashMap<>();
+ Map<String, Map<Long, Map<Long, Set<File>>>> correctToBeSyncedFiles = new HashMap<>();
r = new Random(1);
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashMap<>())
.computeIfAbsent(0L, k -> new HashSet<>());
correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashMap<>())
.computeIfAbsent(0L, k -> new HashSet<>());
String rand = r.nextInt(10000) + TSFILE_SUFFIX;
String fileName =
FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+ + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand;
File file = new File(fileName);
- allFileList.get(getSgName(i)).get(0L).add(file);
- correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file);
+ allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
+ correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
@@ -138,8 +141,8 @@ public class SyncFileManagerTest {
}
}
manager.getValidFiles(dataDir);
- Map<String, Map<Long, Set<File>>> curFileMap = manager.getCurrentSealedLocalFilesMap();
- Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> curFileMap = manager.getCurrentSealedLocalFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
assertFileMap(allFileList, curFileMap);
assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap);
@@ -155,17 +158,19 @@ public class SyncFileManagerTest {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
- .computeIfAbsent(0L, k -> new HashSet<>());
+ .computeIfAbsent(0L, k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashSet<>());
correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>())
- .computeIfAbsent(0L, k -> new HashSet<>());
+ .computeIfAbsent(0L, k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashSet<>());
String rand = r.nextInt(10000) + TSFILE_SUFFIX;
String fileName =
FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separator + getSgName(i) + File.separator + "0" + File.separator
+ + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator
+ File.separator + rand;
File file = new File(fileName);
- allFileList.get(getSgName(i)).get(0L).add(file);
- correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file);
+ allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
+ correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
@@ -179,35 +184,41 @@ public class SyncFileManagerTest {
}
}
int count = 0;
- Map<String, Map<Long, Set<File>>> correctDeleteFile = new HashMap<>();
- for (Entry<String, Map<Long, Set<File>>> entry : allFileList.entrySet()) {
+ Map<String, Map<Long, Map<Long, Set<File>>>> correctDeleteFile = new HashMap<>();
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : allFileList.entrySet()) {
correctDeleteFile.put(entry.getKey(), new HashMap<>());
- for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
- Set<File> files = innerEntry.getValue();
- correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
- for (File file : files) {
- count++;
- if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).contains(file)) {
- correctDeleteFile.get(entry.getKey()).get(0L).add(file);
+ for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+ correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+ for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+ Set<File> files = innerEntry.getValue();
+ correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
+ for (File file : files) {
+ count++;
+ if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).get(0L).contains(file)) {
+ correctDeleteFile.get(entry.getKey()).get(0L).get(0L).add(file);
+ }
}
}
}
}
- for (Entry<String, Map<Long, Set<File>>> entry : correctDeleteFile.entrySet()) {
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctDeleteFile.entrySet()) {
correctDeleteFile.put(entry.getKey(), new HashMap<>());
- for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
- Set<File> files = innerEntry.getValue();
- correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
- for (File file : innerEntry.getValue()) {
- file.delete();
- new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete();
- allFileList.get(entry.getKey()).get(0L).remove(file);
+ for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+ correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+ for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+ Set<File> files = innerEntry.getValue();
+ correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
+ for (File file : innerEntry.getValue()) {
+ file.delete();
+ new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete();
+ allFileList.get(entry.getKey()).get(0L).get(0L).remove(file);
+ }
}
}
}
manager.getValidFiles(dataDir);
curFileMap = manager.getCurrentSealedLocalFilesMap();
- Map<String, Map<Long, Set<File>>> deletedFilesMap = manager.getDeletedFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap = manager.getDeletedFilesMap();
toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
assertFileMap(allFileList, curFileMap);
assertFileMap(correctDeleteFile, deletedFilesMap);
@@ -218,14 +229,15 @@ public class SyncFileManagerTest {
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
- .computeIfAbsent(0L, k -> new HashSet<>());
+ .computeIfAbsent(0L, k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashSet<>());
String rand = String.valueOf(r.nextInt(10000));
String fileName =
FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separator + getSgName(i) + File.separator + "0" + File.separator
+ + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator
+ File.separator + rand;
File file = new File(fileName);
- allFileList.get(getSgName(i)).get(0L).add(file);
+ allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
@@ -245,13 +257,17 @@ public class SyncFileManagerTest {
assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap);
}
- private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap,
- Map<String, Map<Long, Set<File>>> curMap) {
- for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) {
+ private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap,
+ Map<String, Map<Long, Map<Long, Set<File>>>> curMap) {
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) {
assertTrue(curMap.containsKey(entry.getKey()));
- for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
- assertTrue(
- curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+ for (Entry<Long, Map<Long, Set<File>>> innerEntry : entry.getValue().entrySet()) {
+ assertTrue(curMap.get(entry.getKey()).containsKey(innerEntry.getKey()));
+ for (Entry<Long, Set<File>> fileEntry : innerEntry.getValue().entrySet()) {
+ assertTrue(
+ curMap.get(entry.getKey()).get(innerEntry.getKey())
+ .get(fileEntry.getKey()).containsAll(fileEntry.getValue()));
+ }
}
}
}
@@ -260,16 +276,18 @@ public class SyncFileManagerTest {
return IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + i;
}
- private void updateLastLocalFiles(Map<String, Map<Long, Set<File>>> lastLocalFilesMap) {
+ private void updateLastLocalFiles(Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap) {
try (BufferedWriter bw = new BufferedWriter(
new FileWriter(new File(config.getLastFileInfoPath())))) {
- for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
- for (Set<File> files : currentLocalFiles.values()) {
- for (File file : files) {
- bw.write(file.getAbsolutePath());
- bw.newLine();
+ for (Map<Long, Map<Long, Set<File>>> currentLocalFiles : lastLocalFilesMap.values()) {
+ for (Map<Long, Set<File>> vgFiles : currentLocalFiles.values()) {
+ for (Set<File> files : vgFiles.values()) {
+ for (File file : files) {
+ bw.write(file.getAbsolutePath());
+ bw.newLine();
+ }
+ bw.flush();
}
- bw.flush();
}
}
} catch (IOException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index 4609fb5..453e022 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -48,6 +48,7 @@ import java.util.*;
import java.util.Map.Entry;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -79,7 +80,7 @@ public class SyncSenderLogAnalyzerTest {
@Test
public void recover() throws IOException, MetadataException {
- Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>();
+ Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>();
for (int i = 0; i < 3; i++) {
IoTDB.metaManager.setStorageGroup(new PartialPath(getSgName(i)));
@@ -87,12 +88,14 @@ public class SyncSenderLogAnalyzerTest {
Random r = new Random(0);
for (int i = 0; i < 3; i++) {
for (int j = 0; j < 5; j++) {
- allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()).computeIfAbsent(0L, k -> new HashSet<>());
+ allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashMap<>())
+ .computeIfAbsent(0L, k -> new HashSet<>());
String rand = r.nextInt(10000) + TSFILE_SUFFIX;
String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
- + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+ + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand;
File file = new File(fileName);
- allFileList.get(getSgName(i)).get(0L).add(file);
+ allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}
@@ -108,10 +111,12 @@ public class SyncSenderLogAnalyzerTest {
manager.getValidFiles(dataDir);
assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
senderLogger.startSyncTsFiles();
- for (Map<Long, Set<File>> map : allFileList.values()) {
- for (Set<File> newTsFiles : map.values()) {
- for (File file : newTsFiles) {
- senderLogger.finishSyncTsfile(file);
+ for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) {
+ for (Map<Long, Set<File>> vgMap : map.values()) {
+ for (Set<File> newTsFiles : vgMap.values()) {
+ for (File file : newTsFiles) {
+ senderLogger.finishSyncTsfile(file);
+ }
}
}
}
@@ -121,7 +126,7 @@ public class SyncSenderLogAnalyzerTest {
senderLogAnalyzer.recover();
manager.getValidFiles(dataDir);
assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
- Map<String, Map<Long, Set<File>>> lastFilesMap = manager.getLastLocalFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> lastFilesMap = manager.getLastLocalFilesMap();
assertFileMap(allFileList, lastFilesMap);
// delete some files
@@ -131,10 +136,12 @@ public class SyncSenderLogAnalyzerTest {
manager.getValidFiles(dataDir);
assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
senderLogger.startSyncDeletedFilesName();
- for (Map<Long, Set<File>> map : allFileList.values()) {
- for (Set<File> newTsFiles : map.values()) {
- for (File file : newTsFiles) {
- senderLogger.finishSyncDeletedFileName(file);
+ for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) {
+ for (Map<Long, Set<File>> vgMap : map.values()) {
+ for (Set<File> newTsFiles : vgMap.values()) {
+ for (File file : newTsFiles) {
+ senderLogger.finishSyncDeletedFileName(file);
+ }
}
}
}
@@ -144,17 +151,21 @@ public class SyncSenderLogAnalyzerTest {
manager.getValidFiles(dataDir);
assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
assertTrue(SyncUtils.isEmpty(manager.getDeletedFilesMap()));
- Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+ Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
assertFileMap(allFileList, toBeSyncedFilesMap);
}
- private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap,
- Map<String, Map<Long, Set<File>>> curMap) {
- for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) {
+ private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap,
+ Map<String, Map<Long, Map<Long, Set<File>>>> curMap) {
+ for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) {
assertTrue(curMap.containsKey(entry.getKey()));
- for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
- assertTrue(
- curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+ for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+ assertTrue(curMap.get(entry.getKey()).containsKey(vgEntry.getKey()));
+ for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+ assertTrue(
+ curMap.get(entry.getKey()).get(vgEntry.getKey())
+ .get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+ }
}
}
}