You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/01/26 06:46:44 UTC

[iotdb] 01/01: fix sync bug for tsfiles's directory changed by vitural storage group

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch fix_sync_bug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit de8f140afd35a2fe5ef22a1331ed4a819b8b5d9f
Author: chaow <xu...@gmail.com>
AuthorDate: Tue Jan 26 14:45:50 2021 +0800

    fix sync bug for tsfiles's directory changed by vitural storage group
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   4 +-
 .../db/sync/sender/manage/ISyncFileManager.java    |  11 +-
 .../db/sync/sender/manage/SyncFileManager.java     | 137 +++++++++++++--------
 .../iotdb/db/sync/sender/transfer/ISyncClient.java |   3 +-
 .../iotdb/db/sync/sender/transfer/SyncClient.java  |  47 ++++---
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |  12 +-
 .../db/sync/sender/manage/SyncFileManagerTest.java | 116 +++++++++--------
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |  51 +++++---
 8 files changed, 228 insertions(+), 153 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 1170733..e25b5a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -773,7 +773,7 @@ public class StorageEngine implements IService {
 
   public void loadNewTsFileForSync(TsFileResource newTsFileResource)
       throws StorageEngineException, LoadFileException, IllegalPathException {
-    getProcessorDirectly(new PartialPath(getSgByEngineFile(newTsFileResource.getTsFile())))
+    getProcessorDirectly(new PartialPath(newTsFileResource.getTsFile().getParentFile().getName()))
         .loadNewTsFileForSync(newTsFileResource);
   }
 
@@ -791,7 +791,7 @@ public class StorageEngine implements IService {
 
   public boolean deleteTsfileForSync(File deletedTsfile)
       throws StorageEngineException, IllegalPathException {
-    return getProcessorDirectly(new PartialPath(getSgByEngineFile(deletedTsfile)))
+    return getProcessorDirectly(new PartialPath(deletedTsfile.getParentFile().getName()))
         .deleteTsfile(deletedTsfile);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
index 9fd9371..bfd9d77 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/ISyncFileManager.java
@@ -55,13 +55,14 @@ public interface ISyncFileManager {
    */
   void getValidFiles(String dataDir) throws IOException;
 
-  Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap();
 
-  Map<String, Map<Long, Set<File>>> getLastLocalFilesMap();
+  Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap();
 
-  Map<String, Map<Long, Set<File>>> getDeletedFilesMap();
+  Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap();
 
-  Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap();
+  Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap();
 
-  Map<String, Set<Long>> getAllSGs();
+  Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap();
+
+  Map<String, Map<Long, Set<Long>>> getAllSGs();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
index 0a41280..163daea 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManager.java
@@ -47,33 +47,35 @@ public class SyncFileManager implements ISyncFileManager {
 
   /**
    * All storage groups on the disk where the current sync task is executed
+   * logicalSg -> <virtualSg, timeRangeId>
    */
-  private Map<String, Set<Long>> allSGs;
+  private Map<String, Map<Long, Set<Long>>> allSGs;
 
   /**
    * Key is storage group, value is all sealed tsfiles in the storage group. Inner key is time range
    * id, inner value is the set of current sealed tsfiles.
+   * logicalSg -> virtualSg ->  <timeRangeId, files>
    */
-  private Map<String, Map<Long, Set<File>>> currentSealedLocalFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> currentSealedLocalFilesMap;
 
   /**
    * Key is storage group, value is all last local tsfiles in the storage group, which doesn't
    * contains those tsfiles which are not synced successfully. Inner key is time range id, inner
    * value is the set of last local tsfiles.
    */
-  private Map<String, Map<Long, Set<File>>> lastLocalFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap;
 
   /**
    * Key is storage group, value is all deleted tsfiles which need to be synced to receiver end in
    * the storage group. Inner key is time range id, inner value is the valid set of sealed tsfiles.
    */
-  private Map<String, Map<Long, Set<File>>> deletedFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap;
 
   /**
    * Key is storage group, value is all new tsfiles which need to be synced to receiver end in the
    * storage group. Inner key is time range id, inner value is the valid set of new tsfiles.
    */
-  private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap;
 
   private SyncFileManager() {
     IoTDB.metaManager.init();
@@ -90,7 +92,7 @@ public class SyncFileManager implements ISyncFileManager {
 
     currentSealedLocalFilesMap = new HashMap<>();
     // get all files in data dir sequence folder
-    Map<String, Map<Long, Set<File>>> currentAllLocalFiles = new HashMap<>();
+    Map<String, Map<Long, Map<Long, Set<File>>>> currentAllLocalFiles = new HashMap<>();
     if (!new File(dataDir + File.separatorChar + IoTDBConstant.SEQUENCE_FLODER_NAME).exists()) {
       return;
     }
@@ -102,35 +104,49 @@ public class SyncFileManager implements ISyncFileManager {
           .equals(TsFileConstant.TMP_SUFFIX)) {
         continue;
       }
-      allSGs.putIfAbsent(sgFolder.getName(), new HashSet<>());
+      allSGs.putIfAbsent(sgFolder.getName(), new HashMap<>());
       currentAllLocalFiles.putIfAbsent(sgFolder.getName(), new HashMap<>());
-      for (File timeRangeFolder : sgFolder.listFiles()) {
-        try {
-          Long timeRangeId = Long.parseLong(timeRangeFolder.getName());
-          currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(timeRangeId, new HashSet<>());
-          File[] files = timeRangeFolder.listFiles();
-          Arrays.stream(files)
-              .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(timeRangeId)
-                  .add(new File(timeRangeFolder.getAbsolutePath(), file.getName())));
-        } catch (Exception e) {
-          LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e);
-        }
+      for (File virtualSgFolder : sgFolder.listFiles()) {
+          try {
+            Long vgId = Long.parseLong(virtualSgFolder.getName());
+            allSGs.get(sgFolder.getName()).putIfAbsent(vgId, new HashSet<>());
+            currentAllLocalFiles.get(sgFolder.getName()).putIfAbsent(vgId, new HashMap<>());
+
+            for (File timeRangeFolder : virtualSgFolder.listFiles()) {
+              try {
+                Long timeRangeId = Long.parseLong(timeRangeFolder.getName());
+                currentAllLocalFiles.get(sgFolder.getName()).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+                File[] files = timeRangeFolder.listFiles();
+                Arrays.stream(files)
+                  .forEach(file -> currentAllLocalFiles.get(sgFolder.getName()).get(vgId).get(timeRangeId)
+                    .add(new File(timeRangeFolder.getAbsolutePath(), file.getName())));
+              } catch (Exception e) {
+                LOGGER.error("Invalid time range folder: {}", timeRangeFolder.getAbsolutePath(), e);
+              }
+            }
+          } catch (Exception e) {
+            LOGGER.error("Invalid virtual storage group folder: {}", virtualSgFolder.getAbsolutePath(), e);
+          }
       }
     }
 
     // get sealed tsfiles
-    for (Entry<String, Map<Long, Set<File>>> entry : currentAllLocalFiles.entrySet()) {
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : currentAllLocalFiles.entrySet()) {
       String sgName = entry.getKey();
       currentSealedLocalFilesMap.putIfAbsent(sgName, new HashMap<>());
-      for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
-        Long timeRangeId = innerEntry.getKey();
-        currentSealedLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
-        for (File file : innerEntry.getValue()) {
-          if (!file.getName().endsWith(TSFILE_SUFFIX)) {
-            continue;
-          }
-          if (checkFileValidity(file)) {
-            currentSealedLocalFilesMap.get(sgName).get(timeRangeId).add(file);
+      for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+        Long vgId = vgEntry.getKey();
+        currentSealedLocalFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+        for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+          Long timeRangeId = innerEntry.getKey();
+          currentSealedLocalFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+          for (File file : innerEntry.getValue()) {
+            if (!file.getName().endsWith(TSFILE_SUFFIX)) {
+              continue;
+            }
+            if (checkFileValidity(file)) {
+              currentSealedLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(file);
+            }
           }
         }
       }
@@ -157,10 +173,13 @@ public class SyncFileManager implements ISyncFileManager {
       while ((filePath = reader.readLine()) != null) {
         File file = new File(filePath);
         Long timeRangeId = Long.parseLong(file.getParentFile().getName());
-        String sgName = file.getParentFile().getParentFile().getName();
-        allSGs.putIfAbsent(sgName, new HashSet<>());
+        Long vgId = Long.parseLong(file.getParentFile().getParentFile().getName());
+        String sgName = file.getParentFile().getParentFile().getParentFile().getName();
+        allSGs.putIfAbsent(sgName, new HashMap<>());
+        allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
         lastLocalFilesMap.computeIfAbsent(sgName, k -> new HashMap<>())
-            .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file);
+          .computeIfAbsent(vgId, k -> new HashMap<>())
+          .computeIfAbsent(timeRangeId, k -> new HashSet<>()).add(file);
       }
     }
   }
@@ -177,28 +196,44 @@ public class SyncFileManager implements ISyncFileManager {
     for (String sgName : allSGs.keySet()) {
       toBeSyncedFilesMap.putIfAbsent(sgName, new HashMap<>());
       deletedFilesMap.putIfAbsent(sgName, new HashMap<>());
-      for (Entry<Long, Set<File>> entry : currentSealedLocalFilesMap
-          .getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
-        Long timeRangeId = entry.getKey();
-        toBeSyncedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
-        allSGs.get(sgName).add(timeRangeId);
-        for (File newFile : entry.getValue()) {
-          if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+
+      for (Entry<Long, Map<Long, Set<File>>> entry : currentSealedLocalFilesMap
+        .getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
+        Long vgId = entry.getKey();
+        toBeSyncedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+        allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
+
+        for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
+          Long timeRangeId = innerEntry.getKey();
+          toBeSyncedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+          allSGs.get(sgName).get(vgId).add(timeRangeId);
+          for (File newFile : innerEntry.getValue()) {
+            if (!lastLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+              .getOrDefault(vgId, Collections.emptyMap())
               .getOrDefault(timeRangeId, Collections.emptySet()).contains(newFile)) {
-            toBeSyncedFilesMap.get(sgName).get(timeRangeId).add(newFile);
+              toBeSyncedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(newFile);
+            }
           }
         }
       }
 
-      for (Entry<Long, Set<File>> entry : lastLocalFilesMap
+      for (Entry<Long, Map<Long, Set<File>>> entry : lastLocalFilesMap
           .getOrDefault(sgName, Collections.emptyMap()).entrySet()) {
-        Long timeRangeId = entry.getKey();
-        deletedFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
-        allSGs.get(sgName).add(timeRangeId);
-        for (File oldFile : entry.getValue()) {
-          if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+        Long vgId = entry.getKey();
+        deletedFilesMap.get(sgName).putIfAbsent(vgId, new HashMap<>());
+        allSGs.get(sgName).putIfAbsent(vgId, new HashSet<>());
+
+        for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
+          Long timeRangeId = innerEntry.getKey();
+          deletedFilesMap.get(sgName).get(vgId).putIfAbsent(timeRangeId, new HashSet<>());
+          allSGs.get(sgName).get(vgId).add(timeRangeId);
+
+          for (File oldFile : innerEntry.getValue()) {
+            if (!currentSealedLocalFilesMap.getOrDefault(sgName, Collections.emptyMap())
+              .getOrDefault(vgId, Collections.emptyMap())
               .getOrDefault(timeRangeId, Collections.emptySet()).contains(oldFile)) {
-            deletedFilesMap.get(sgName).get(timeRangeId).add(oldFile);
+              deletedFilesMap.get(sgName).get(vgId).get(timeRangeId).add(oldFile);
+            }
           }
         }
       }
@@ -206,27 +241,27 @@ public class SyncFileManager implements ISyncFileManager {
   }
 
   @Override
-  public Map<String, Map<Long, Set<File>>> getCurrentSealedLocalFilesMap() {
+  public Map<String, Map<Long, Map<Long, Set<File>>>> getCurrentSealedLocalFilesMap() {
     return currentSealedLocalFilesMap;
   }
 
   @Override
-  public Map<String, Map<Long, Set<File>>> getLastLocalFilesMap() {
+  public Map<String, Map<Long, Map<Long, Set<File>>>> getLastLocalFilesMap() {
     return lastLocalFilesMap;
   }
 
   @Override
-  public Map<String, Map<Long, Set<File>>> getDeletedFilesMap() {
+  public Map<String, Map<Long, Map<Long, Set<File>>>> getDeletedFilesMap() {
     return deletedFilesMap;
   }
 
   @Override
-  public Map<String, Map<Long, Set<File>>> getToBeSyncedFilesMap() {
+  public Map<String, Map<Long, Map<Long, Set<File>>>> getToBeSyncedFilesMap() {
     return toBeSyncedFilesMap;
   }
 
   @Override
-  public Map<String, Set<Long>> getAllSGs() {
+  public Map<String, Map<Long, Set<Long>>> getAllSGs() {
     return allSGs;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
index 15323bc..1d501ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/ISyncClient.java
@@ -92,10 +92,11 @@ public interface ISyncClient {
    * receiver.
    *
    * @param sgName storage group name
+   * @param vgId virtual group id
    * @param timeRangeId id of time range
    * @param toBeSyncFiles list of new tsfile names
    */
-  void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles)
+  void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles)
       throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException;
 
   /**
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index b88c0de..71ede28 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -105,13 +105,13 @@ public class SyncClient implements ISyncClient {
 
   private SyncService.Client serviceClient;
 
-  private Map<String, Set<Long>> allSG;
+  private Map<String, Map<Long, Set<Long>>> allSG;
 
-  private Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap;
 
-  private Map<String, Map<Long, Set<File>>> deletedFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap;
 
-  private Map<String, Map<Long, Set<File>>> lastLocalFilesMap;
+  private Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap;
 
   /**
    * If true, sync is in execution.
@@ -454,7 +454,7 @@ public class SyncClient implements ISyncClient {
       syncStatus = true;
 
       List<String> storageGroups = config.getStorageGroupList();
-      for (Entry<String, Set<Long>> entry : allSG.entrySet()) {
+      for (Entry<String, Map<Long, Set<Long>>> entry : allSG.entrySet()) {
         String sgName = entry.getKey();
         if (!storageGroups.isEmpty() && !storageGroups.contains(sgName)) {
           continue;
@@ -470,17 +470,22 @@ public class SyncClient implements ISyncClient {
           throw new SyncConnectionException("Unable to connect to receiver", e);
         }
         logger.info(
-            "Sync process starts to transfer data of storage group {}, it has {} time ranges.",
+            "Sync process starts to transfer data of storage group {}, it has {} virtual storage group.",
             sgName, entry.getValue().size());
         try {
-          for (Long timeRangeId : entry.getValue()) {
-            lastLocalFilesMap.get(sgName).putIfAbsent(timeRangeId, new HashSet<>());
-            syncDeletedFilesNameInOneGroup(sgName, timeRangeId,
+          for (Entry<Long, Set<Long>> vgEntry : entry.getValue().entrySet()) {
+            lastLocalFilesMap.get(sgName).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+            for (Long timeRangeId : vgEntry.getValue()) {
+              lastLocalFilesMap.get(sgName).get(vgEntry.getKey()).putIfAbsent(timeRangeId, new HashSet<>());
+              syncDeletedFilesNameInOneGroup(sgName, timeRangeId,
                 deletedFilesMap.getOrDefault(sgName, Collections.emptyMap())
-                    .getOrDefault(timeRangeId, Collections.emptySet()));
-            syncDataFilesInOneGroup(sgName, timeRangeId,
+                  .getOrDefault(vgEntry.getKey(), Collections.emptyMap())
+                  .getOrDefault(timeRangeId, Collections.emptySet()));
+              syncDataFilesInOneGroup(sgName, vgEntry.getKey(), timeRangeId,
                 toBeSyncedFilesMap.getOrDefault(sgName, Collections.emptyMap())
-                    .getOrDefault(timeRangeId, Collections.emptySet()));
+                  .getOrDefault(vgEntry.getKey(), Collections.emptyMap())
+                  .getOrDefault(timeRangeId, Collections.emptySet()));
+            }
           }
         } catch (SyncDeviceOwnerConflictException e) {
           deletedFilesMap.remove(sgName);
@@ -528,7 +533,7 @@ public class SyncClient implements ISyncClient {
   }
 
   @Override
-  public void syncDataFilesInOneGroup(String sgName, Long timeRangeId, Set<File> toBeSyncFiles)
+  public void syncDataFilesInOneGroup(String sgName, Long vgId, Long timeRangeId, Set<File> toBeSyncFiles)
       throws SyncConnectionException, IOException, SyncDeviceOwnerConflictException {
     if (toBeSyncFiles.isEmpty()) {
       logger.info("There has no new tsfiles to be synced in storage group {}", sgName);
@@ -544,7 +549,7 @@ public class SyncClient implements ISyncClient {
         // firstly sync .resource file, then sync tsfile
         syncSingleFile(new File(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX));
         syncSingleFile(snapshotFile);
-        lastLocalFilesMap.get(sgName).get(timeRangeId).add(tsfile);
+        lastLocalFilesMap.get(sgName).get(vgId).get(timeRangeId).add(tsfile);
         syncLog.finishSyncTsfile(tsfile);
         logger.info("Task of synchronization has completed {}/{}.", cnt, toBeSyncFiles.size());
       } catch (IOException e) {
@@ -638,13 +643,15 @@ public class SyncClient implements ISyncClient {
 
     // 1. Write file list to currentLocalFile
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
-      for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
-        for (Set<File> files : currentLocalFiles.values()) {
-          for (File file : files) {
-            bw.write(file.getAbsolutePath());
-            bw.newLine();
+      for (Map<Long, Map<Long, Set<File>>> vgCurrentLocalFiles : lastLocalFilesMap.values()) {
+        for (Map<Long, Set<File>> currentLocalFiles : vgCurrentLocalFiles.values()) {
+          for (Set<File> files : currentLocalFiles.values()) {
+            for (File file : files) {
+              bw.write(file.getAbsolutePath());
+              bw.newLine();
+            }
+            bw.flush();
           }
-          bw.flush();
         }
       }
     } catch (IOException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 2401a01..e5446a0 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -49,11 +49,13 @@ public class SyncUtils {
   /**
    * Verify sending list is empty or not It's used by sync sender.
    */
-  public static boolean isEmpty(Map<String, Map<Long, Set<File>>> sendingFileList) {
-    for (Entry<String, Map<Long, Set<File>>> entry: sendingFileList.entrySet()) {
-      for(Entry<Long, Set<File>> innerEntry: entry.getValue().entrySet()) {
-        if (!innerEntry.getValue().isEmpty()) {
-          return false;
+  public static boolean isEmpty(Map<String, Map<Long, Map<Long, Set<File>>>> sendingFileList) {
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry: sendingFileList.entrySet()) {
+      for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+        for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+          if (!innerEntry.getValue().isEmpty()) {
+            return false;
+          }
         }
       }
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
index e742d57..c82f346 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
@@ -71,7 +71,7 @@ public class SyncFileManagerTest {
 
   @Test
   public void testGetValidFiles() throws IOException, MetadataException {
-    Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>();
+    Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>();
 
     Random r = new Random(0);
     for (int i = 0; i < 3; i++) {
@@ -80,12 +80,13 @@ public class SyncFileManagerTest {
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 5; j++) {
         allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+            .computeIfAbsent(0L, k -> new HashMap<>())
             .computeIfAbsent(0L, k -> new HashSet<>());
         String rand = r.nextInt(10000) + TSFILE_SUFFIX;
         String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
-            + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+            + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand;
         File file = new File(fileName);
-        allFileList.get(getSgName(i)).get(0L).add(file);
+        allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
         if (!file.getParentFile().exists()) {
           file.getParentFile().mkdirs();
         }
@@ -106,25 +107,27 @@ public class SyncFileManagerTest {
     updateLastLocalFiles(allFileList);
 
     manager.getValidFiles(dataDir);
-    Map<String, Map<Long, Set<File>>> lastFileMap = manager.getLastLocalFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> lastFileMap = manager.getLastLocalFilesMap();
     assertFileMap(allFileList, lastFileMap);
 
     // add some files
-    Map<String, Map<Long, Set<File>>> correctToBeSyncedFiles = new HashMap<>();
+    Map<String, Map<Long, Map<Long, Set<File>>>> correctToBeSyncedFiles = new HashMap<>();
     r = new Random(1);
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 5; j++) {
         allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashMap<>())
             .computeIfAbsent(0L, k -> new HashSet<>());
         correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashMap<>())
             .computeIfAbsent(0L, k -> new HashSet<>());
         String rand = r.nextInt(10000) + TSFILE_SUFFIX;
         String fileName =
             FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
-                + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+                + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator + rand;
         File file = new File(fileName);
-        allFileList.get(getSgName(i)).get(0L).add(file);
-        correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file);
+        allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
+        correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file);
         if (!file.getParentFile().exists()) {
           file.getParentFile().mkdirs();
         }
@@ -138,8 +141,8 @@ public class SyncFileManagerTest {
       }
     }
     manager.getValidFiles(dataDir);
-    Map<String, Map<Long, Set<File>>> curFileMap = manager.getCurrentSealedLocalFilesMap();
-    Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> curFileMap = manager.getCurrentSealedLocalFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
     assertFileMap(allFileList, curFileMap);
     assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap);
 
@@ -155,17 +158,19 @@ public class SyncFileManagerTest {
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 5; j++) {
         allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
-            .computeIfAbsent(0L, k -> new HashSet<>());
+          .computeIfAbsent(0L, k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashSet<>());
         correctToBeSyncedFiles.computeIfAbsent(getSgName(i), k -> new HashMap<>())
-            .computeIfAbsent(0L, k -> new HashSet<>());
+          .computeIfAbsent(0L, k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashSet<>());
         String rand = r.nextInt(10000) + TSFILE_SUFFIX;
         String fileName =
             FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
-                + File.separator + getSgName(i) + File.separator + "0" + File.separator
+                + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator
                 + File.separator + rand;
         File file = new File(fileName);
-        allFileList.get(getSgName(i)).get(0L).add(file);
-        correctToBeSyncedFiles.get(getSgName(i)).get(0L).add(file);
+        allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
+        correctToBeSyncedFiles.get(getSgName(i)).get(0L).get(0L).add(file);
         if (!file.getParentFile().exists()) {
           file.getParentFile().mkdirs();
         }
@@ -179,35 +184,41 @@ public class SyncFileManagerTest {
       }
     }
     int count = 0;
-    Map<String, Map<Long, Set<File>>> correctDeleteFile = new HashMap<>();
-    for (Entry<String, Map<Long, Set<File>>> entry : allFileList.entrySet()) {
+    Map<String, Map<Long, Map<Long, Set<File>>>> correctDeleteFile = new HashMap<>();
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : allFileList.entrySet()) {
       correctDeleteFile.put(entry.getKey(), new HashMap<>());
-      for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
-        Set<File> files = innerEntry.getValue();
-        correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
-        for (File file : files) {
-          count++;
-          if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).contains(file)) {
-            correctDeleteFile.get(entry.getKey()).get(0L).add(file);
+      for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+        correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+        for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+          Set<File> files = innerEntry.getValue();
+          correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
+          for (File file : files) {
+            count++;
+            if (count % 3 == 0 && lastFileMap.get(entry.getKey()).get(0L).get(0L).contains(file)) {
+              correctDeleteFile.get(entry.getKey()).get(0L).get(0L).add(file);
+            }
           }
         }
       }
     }
-    for (Entry<String, Map<Long, Set<File>>> entry : correctDeleteFile.entrySet()) {
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctDeleteFile.entrySet()) {
       correctDeleteFile.put(entry.getKey(), new HashMap<>());
-      for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
-        Set<File> files = innerEntry.getValue();
-        correctDeleteFile.get(entry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
-        for (File file : innerEntry.getValue()) {
-          file.delete();
-          new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete();
-          allFileList.get(entry.getKey()).get(0L).remove(file);
+      for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+        correctDeleteFile.get(entry.getKey()).putIfAbsent(vgEntry.getKey(), new HashMap<>());
+        for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+          Set<File> files = innerEntry.getValue();
+          correctDeleteFile.get(entry.getKey()).get(vgEntry.getKey()).putIfAbsent(innerEntry.getKey(), new HashSet<>());
+          for (File file : innerEntry.getValue()) {
+            file.delete();
+            new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).delete();
+            allFileList.get(entry.getKey()).get(0L).get(0L).remove(file);
+          }
         }
       }
     }
     manager.getValidFiles(dataDir);
     curFileMap = manager.getCurrentSealedLocalFilesMap();
-    Map<String, Map<Long, Set<File>>> deletedFilesMap = manager.getDeletedFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> deletedFilesMap = manager.getDeletedFilesMap();
     toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
     assertFileMap(allFileList, curFileMap);
     assertFileMap(correctDeleteFile, deletedFilesMap);
@@ -218,14 +229,15 @@ public class SyncFileManagerTest {
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 5; j++) {
         allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
-            .computeIfAbsent(0L, k -> new HashSet<>());
+          .computeIfAbsent(0L, k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashSet<>());
         String rand = String.valueOf(r.nextInt(10000));
         String fileName =
             FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
-                + File.separator + getSgName(i) + File.separator + "0" + File.separator
+                + File.separator + getSgName(i) + File.separator + "0" + File.separator + "0" + File.separator
                 + File.separator + rand;
         File file = new File(fileName);
-        allFileList.get(getSgName(i)).get(0L).add(file);
+        allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
         if (!file.getParentFile().exists()) {
           file.getParentFile().mkdirs();
         }
@@ -245,13 +257,17 @@ public class SyncFileManagerTest {
     assertFileMap(correctToBeSyncedFiles, toBeSyncedFilesMap);
   }
 
-  private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap,
-      Map<String, Map<Long, Set<File>>> curMap) {
-    for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) {
+  private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap,
+      Map<String, Map<Long, Map<Long, Set<File>>>> curMap) {
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) {
       assertTrue(curMap.containsKey(entry.getKey()));
-      for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
-        assertTrue(
-            curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+      for (Entry<Long, Map<Long, Set<File>>> innerEntry : entry.getValue().entrySet()) {
+        assertTrue(curMap.get(entry.getKey()).containsKey(innerEntry.getKey()));
+        for (Entry<Long, Set<File>> fileEntry : innerEntry.getValue().entrySet()) {
+          assertTrue(
+            curMap.get(entry.getKey()).get(innerEntry.getKey())
+              .get(fileEntry.getKey()).containsAll(fileEntry.getValue()));
+        }
       }
     }
   }
@@ -260,16 +276,18 @@ public class SyncFileManagerTest {
     return IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + i;
   }
 
-  private void updateLastLocalFiles(Map<String, Map<Long, Set<File>>> lastLocalFilesMap) {
+  private void updateLastLocalFiles(Map<String, Map<Long, Map<Long, Set<File>>>> lastLocalFilesMap) {
     try (BufferedWriter bw = new BufferedWriter(
         new FileWriter(new File(config.getLastFileInfoPath())))) {
-      for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) {
-        for (Set<File> files : currentLocalFiles.values()) {
-          for (File file : files) {
-            bw.write(file.getAbsolutePath());
-            bw.newLine();
+      for (Map<Long, Map<Long, Set<File>>> currentLocalFiles : lastLocalFilesMap.values()) {
+        for (Map<Long, Set<File>> vgFiles : currentLocalFiles.values()) {
+          for (Set<File> files : vgFiles.values()) {
+            for (File file : files) {
+              bw.write(file.getAbsolutePath());
+              bw.newLine();
+            }
+            bw.flush();
           }
-          bw.flush();
         }
       }
     } catch (IOException e) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index 4609fb5..453e022 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -48,6 +48,7 @@ import java.util.*;
 import java.util.Map.Entry;
 
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -79,7 +80,7 @@ public class SyncSenderLogAnalyzerTest {
 
   @Test
   public void recover() throws IOException, MetadataException {
-    Map<String, Map<Long, Set<File>>> allFileList = new HashMap<>();
+    Map<String, Map<Long, Map<Long, Set<File>>>> allFileList = new HashMap<>();
 
     for (int i = 0; i < 3; i++) {
       IoTDB.metaManager.setStorageGroup(new PartialPath(getSgName(i)));
@@ -87,12 +88,14 @@ public class SyncSenderLogAnalyzerTest {
     Random r = new Random(0);
     for (int i = 0; i < 3; i++) {
       for (int j = 0; j < 5; j++) {
-        allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>()).computeIfAbsent(0L, k -> new HashSet<>());
+        allFileList.computeIfAbsent(getSgName(i), k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashMap<>())
+          .computeIfAbsent(0L, k -> new HashSet<>());
         String rand = r.nextInt(10000) + TSFILE_SUFFIX;
         String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
-            + File.separator + getSgName(i) + File.separator + "0" + File.separator + rand;
+            + File.separator + getSgName(i) + File.separator + "0"  + File.separator + "0" + File.separator + rand;
         File file = new File(fileName);
-        allFileList.get(getSgName(i)).get(0L).add(file);
+        allFileList.get(getSgName(i)).get(0L).get(0L).add(file);
         if (!file.getParentFile().exists()) {
           file.getParentFile().mkdirs();
         }
@@ -108,10 +111,12 @@ public class SyncSenderLogAnalyzerTest {
     manager.getValidFiles(dataDir);
     assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
     senderLogger.startSyncTsFiles();
-    for (Map<Long, Set<File>> map : allFileList.values()) {
-      for (Set<File> newTsFiles : map.values()) {
-        for (File file : newTsFiles) {
-          senderLogger.finishSyncTsfile(file);
+    for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) {
+      for (Map<Long, Set<File>> vgMap : map.values()) {
+        for (Set<File> newTsFiles : vgMap.values()) {
+          for (File file : newTsFiles) {
+            senderLogger.finishSyncTsfile(file);
+          }
         }
       }
     }
@@ -121,7 +126,7 @@ public class SyncSenderLogAnalyzerTest {
     senderLogAnalyzer.recover();
     manager.getValidFiles(dataDir);
     assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
-    Map<String, Map<Long, Set<File>>> lastFilesMap = manager.getLastLocalFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> lastFilesMap = manager.getLastLocalFilesMap();
     assertFileMap(allFileList, lastFilesMap);
 
     // delete some files
@@ -131,10 +136,12 @@ public class SyncSenderLogAnalyzerTest {
     manager.getValidFiles(dataDir);
     assertFalse(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
     senderLogger.startSyncDeletedFilesName();
-    for (Map<Long, Set<File>> map : allFileList.values()) {
-      for (Set<File> newTsFiles : map.values()) {
-        for (File file : newTsFiles) {
-          senderLogger.finishSyncDeletedFileName(file);
+    for (Map<Long, Map<Long, Set<File>>> map : allFileList.values()) {
+      for (Map<Long, Set<File>> vgMap : map.values()) {
+        for (Set<File> newTsFiles : vgMap.values()) {
+          for (File file : newTsFiles) {
+            senderLogger.finishSyncDeletedFileName(file);
+          }
         }
       }
     }
@@ -144,17 +151,21 @@ public class SyncSenderLogAnalyzerTest {
     manager.getValidFiles(dataDir);
     assertTrue(SyncUtils.isEmpty(manager.getLastLocalFilesMap()));
     assertTrue(SyncUtils.isEmpty(manager.getDeletedFilesMap()));
-    Map<String, Map<Long, Set<File>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+    Map<String, Map<Long, Map<Long, Set<File>>>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
     assertFileMap(allFileList, toBeSyncedFilesMap);
   }
 
-  private void assertFileMap(Map<String, Map<Long, Set<File>>> correctMap,
-      Map<String, Map<Long, Set<File>>> curMap) {
-    for (Entry<String, Map<Long, Set<File>>> entry : correctMap.entrySet()) {
+  private void assertFileMap(Map<String, Map<Long, Map<Long, Set<File>>>> correctMap,
+      Map<String, Map<Long, Map<Long, Set<File>>>> curMap) {
+    for (Entry<String, Map<Long, Map<Long, Set<File>>>> entry : correctMap.entrySet()) {
       assertTrue(curMap.containsKey(entry.getKey()));
-      for (Entry<Long, Set<File>> innerEntry : entry.getValue().entrySet()) {
-        assertTrue(
-            curMap.get(entry.getKey()).get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+      for (Entry<Long, Map<Long, Set<File>>> vgEntry : entry.getValue().entrySet()) {
+        assertTrue(curMap.get(entry.getKey()).containsKey(vgEntry.getKey()));
+        for (Entry<Long, Set<File>> innerEntry : vgEntry.getValue().entrySet()) {
+          assertTrue(
+            curMap.get(entry.getKey()).get(vgEntry.getKey())
+              .get(innerEntry.getKey()).containsAll(innerEntry.getValue()));
+        }
       }
     }
   }