You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/02 09:50:10 UTC

[incubator-iotdb] branch reimpl_sync updated (32e2ff8 -> 2060fea)

This is an automated email from the ASF dual-hosted git repository.

lta pushed a change to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


    from 32e2ff8  add sync file manager test
     new a81f280  add sync sender log analyzer
     new 1785cab  complete load tsfiles module
     new 2060fea  add snapshot unit test

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 +
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  23 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 247 +++++++++++++++++++--
 .../db/engine/storagegroup/TsFileResource.java     |   4 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    |  43 +++-
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   5 +-
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |  19 +-
 .../sync/sender/transfer/DataTransferManager.java  |   8 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |   2 +-
 .../db/sync/sender/manage/SyncFileManagerTest.java |  18 ++
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 157 +++++++++++++
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 107 +++++++++
 .../sender/transfer/DataTransferManagerTest.java   | 135 +++++++++++
 13 files changed, 745 insertions(+), 34 deletions(-)
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
 create mode 100644 server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java


[incubator-iotdb] 03/03: add snapshot unit test

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 2060feaa0517de2a2a3d535613c4bcabbcdf0694
Author: lta <li...@163.com>
AuthorDate: Mon Sep 2 17:49:52 2019 +0800

    add snapshot unit test
---
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   2 +-
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   9 +-
 .../sync/sender/transfer/DataTransferManager.java  |   6 +-
 .../java/org/apache/iotdb/db/utils/SyncUtils.java  |   2 +-
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |  18 +++
 .../sender/transfer/DataTransferManagerTest.java   | 135 +++++++++++++++++++++
 6 files changed, 165 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index 3d7a356..652839b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -34,6 +34,6 @@ public interface ISyncSenderLogAnalyzer {
 
   void loadLogger(Set<String> deletedFiles, Set<String> newFiles);
 
-  void clearLogger(Set<String> currentLocalFiles);
+  void updateLastLocalFile(Set<String> currentLocalFiles);
 
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 6eaec50..5f97f05 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -58,7 +58,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
       loadLogger(deletedFiles, newFiles);
       lastLocalFiles.removeAll(deletedFiles);
       lastLocalFiles.addAll(newFiles);
-      clearLogger(lastLocalFiles);
+      updateLastLocalFile(lastLocalFiles);
     }
     FileUtils.deleteDirectory(new File(senderPath, Constans.DATA_SNAPSHOT_NAME));
     syncLogFile.delete();
@@ -67,6 +67,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
   @Override
   public void loadLastLocalFiles(Set<String> lastLocalFiles) {
     if (!lastLocalFile.exists()) {
+      LOGGER.info("last local  file {} doesn't exist.", syncLogFile.getAbsolutePath());
       return;
     }
     try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
@@ -83,6 +84,10 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   @Override
   public void loadLogger(Set<String> deletedFiles, Set<String> newFiles) {
+    if (!syncLogFile.exists()) {
+      LOGGER.info("log file {} doesn't exist.", syncLogFile.getAbsolutePath());
+      return;
+    }
     try (BufferedReader br = new BufferedReader(new FileReader(syncLogFile))) {
       String line;
       int mode = 0;
@@ -107,7 +112,7 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
   }
 
   @Override
-  public void clearLogger(Set<String> currentLocalFiles) {
+  public void updateLastLocalFile(Set<String> currentLocalFiles) {
     try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) {
       for (String line : currentLocalFiles) {
         bw.write(line);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 6331d2c..aecf59e 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -524,18 +524,18 @@ public class DataTransferManager implements IDataTransferManager {
    *
    * @param file new tsfile to be synced
    */
-  private File makeFileSnapshot(File file) throws IOException {
+  File makeFileSnapshot(File file) throws IOException {
     File snapshotFile = SyncUtils.getSnapshotFile(file);
     if (!snapshotFile.getParentFile().exists()) {
       snapshotFile.getParentFile().mkdirs();
     }
     Path link = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
-    Path target = FileSystems.getDefault().getPath(snapshotFile.getAbsolutePath());
+    Path target = FileSystems.getDefault().getPath(file.getAbsolutePath());
     Files.createLink(link, target);
     link = FileSystems.getDefault()
         .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     target = FileSystems.getDefault()
-        .getPath(snapshotFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
+        .getPath(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX);
     Files.createLink(link, target);
     return snapshotFile;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
index 13247cb..599edac 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SyncUtils.java
@@ -38,7 +38,7 @@ public class SyncUtils {
    * sender.
    */
   public static File getSnapshotFile(File file) {
-    String relativeFilePath = file.getParent() + File.separator + file.getName();
+    String relativeFilePath = file.getParentFile().getName() + File.separator + file.getName();
     String snapshotDir = SyncSenderDescriptor.getInstance().getConfig().getSnapshotPath();
     if (!new File(snapshotDir).exists()) {
       new File(snapshotDir).mkdirs();
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index c30eb8f..71c9629 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.recover;
 
 import java.io.File;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java
new file mode 100644
index 0000000..ef49dd5
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManagerTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.transfer;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DataTransferManagerTest {
+
+  private static final Logger logger = LoggerFactory.getLogger(DataTransferManagerTest.class);
+  private DataTransferManager manager = DataTransferManager.getInstance();
+  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private String dataDir;
+  private SyncSenderLogAnalyzer senderLogAnalyzer;
+
+  @Before
+  public void setUp()
+      throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException {
+    EnvironmentUtils.envSetUp();
+    dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+        .getParentFile().getAbsolutePath();
+    config.update(dataDir);
+    senderLogAnalyzer = new SyncSenderLogAnalyzer(config.getSenderFolderPath());
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void makeFileSnapshot() throws IOException {
+    Map<String, Set<File>> allFileList = new HashMap<>();
+
+    Random r = new Random(0);
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 5; j++) {
+        if (!allFileList.containsKey(String.valueOf(i))) {
+          allFileList.put(String.valueOf(i), new HashSet<>());
+        }
+        String rand = String.valueOf(r.nextInt(10000));
+        String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
+            + File.separator + i
+            + File.separator + rand;
+        File file = new File(fileName);
+        allFileList.get(String.valueOf(i)).add(file);
+        if (!file.getParentFile().exists()) {
+          file.getParentFile().mkdirs();
+        }
+        if (!file.exists() && !file.createNewFile()) {
+          logger.error("Can not create new file {}", file.getPath());
+        }
+        if (!new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()
+            && !new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).createNewFile()) {
+          logger.error("Can not create new file {}", file.getPath());
+        }
+      }
+    }
+
+    Map<String, Set<String>> dataFileMap = new HashMap<>();
+    File sequenceFile = new File(dataDir, IoTDBConstant.SEQUENCE_FLODER_NAME);
+    for(File sgFile: sequenceFile.listFiles()){
+      dataFileMap.putIfAbsent(sgFile.getName(), new HashSet<>());
+      for (File tsfile : sgFile.listFiles()) {
+        if (!tsfile.getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+          manager.makeFileSnapshot(tsfile);
+        }
+        dataFileMap.get(sgFile.getName()).add(tsfile.getName());
+      }
+    }
+
+    assert new File(config.getSenderFolderPath()).exists();
+    assert new File(config.getSnapshotPath()).exists();
+
+    Map<String, Set<String>> snapFileMap = new HashMap<>();
+    for(File sgFile: new File(config.getSnapshotPath()).listFiles()){
+      snapFileMap.putIfAbsent(sgFile.getName(), new HashSet<>());
+      for(File snapshotTsfile: sgFile.listFiles()){
+        snapFileMap.get(sgFile.getName()).add(snapshotTsfile.getName());
+      }
+    }
+
+    assert dataFileMap.size() == snapFileMap.size();
+    for(Entry<String, Set<String>> entry: dataFileMap.entrySet()){
+      String sg = entry.getKey();
+      Set<String> tsfiles = entry.getValue();
+      assert snapFileMap.containsKey(sg);
+      assert snapFileMap.get(sg).size() == tsfiles.size();
+      assert snapFileMap.get(sg).containsAll(tsfiles);
+    }
+
+    assert !new File(config.getLastFileInfo()).exists();
+    senderLogAnalyzer.recover();
+    assert !new File(config.getSnapshotPath()).exists();
+    assert new File(config.getLastFileInfo()).exists();
+  }
+}
\ No newline at end of file


[incubator-iotdb] 01/03: add sync sender log analyzer

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a81f28010c9e494f81f3924a198ed9e161ae7fc8
Author: lta <li...@163.com>
AuthorDate: Fri Aug 30 21:23:36 2019 +0800

    add sync sender log analyzer
---
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   3 +
 .../db/sync/sender/manage/SyncFileManagerTest.java |  18 ++++
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 114 +++++++++++++++++++++
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 107 +++++++++++++++++++
 4 files changed, 242 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 60d45ed..af4f00c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -61,6 +61,9 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   @Override
   public void loadLastLocalFiles(Set<String> lastLocalFiles) {
+    if (!lastLocalFile.exists()) {
+      return;
+    }
     try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
       String line;
       while ((line = br.readLine()) != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
index 91e59d3..46f339c 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.manage;
 
 import java.io.BufferedWriter;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
new file mode 100644
index 0000000..6b65776
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -0,0 +1,114 @@
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncSenderLogAnalyzerTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzerTest.class);
+  private SyncSenderLogger senderLogger;
+  private SyncSenderLogAnalyzer senderLogAnalyzer;
+  private SyncFileManager manager = SyncFileManager.getInstance();
+  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private String dataDir;
+
+  @Before
+  public void setUp()
+      throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException {
+    EnvironmentUtils.envSetUp();
+    dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+        .getParentFile().getAbsolutePath();
+    config.update(dataDir);
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    senderLogAnalyzer = new SyncSenderLogAnalyzer(config.getSenderFolderPath());
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void recover() throws IOException {
+    Map<String, Set<File>> allFileList = new HashMap<>();
+
+    Random r = new Random(0);
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 5; j++) {
+        if (!allFileList.containsKey(String.valueOf(i))) {
+          allFileList.put(String.valueOf(i), new HashSet<>());
+        }
+        String rand = String.valueOf(r.nextInt(10000));
+        String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
+            + File.separator + i
+            + File.separator + rand;
+        File file = new File(fileName);
+        allFileList.get(String.valueOf(i)).add(file);
+        if (!file.getParentFile().exists()) {
+          file.getParentFile().mkdirs();
+        }
+        if (!file.exists() && !file.createNewFile()) {
+          LOGGER.error("Can not create new file {}", file.getPath());
+        }
+        if (!new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()
+            && !new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).createNewFile()) {
+          LOGGER.error("Can not create new file {}", file.getPath());
+        }
+      }
+    }
+    manager.getValidFiles(dataDir);
+    assert isEmpty(manager.getLastLocalFilesMap());
+    senderLogger.startSyncTsFiles();
+    for(Set<File> newTsFiles:allFileList.values()){
+      for(File file: newTsFiles){
+        senderLogger.finishSyncTsfile(file);
+      }
+    }
+    senderLogger.close();
+
+    // recover log
+    senderLogAnalyzer.recover();
+    manager.getValidFiles(dataDir);
+    assert !isEmpty(manager.getLastLocalFilesMap());
+    Map<String, Set<File>> lastFilesMap = manager.getLastLocalFilesMap();
+    for (Entry<String, Set<File>> entry : allFileList.entrySet()) {
+      assert lastFilesMap.containsKey(entry.getKey());
+      assert lastFilesMap.get(entry.getKey()).size() == entry.getValue().size();
+      assert lastFilesMap.get(entry.getKey()).containsAll(entry.getValue());
+    }
+  }
+
+  private boolean isEmpty(Map<String, Set<File>> sendingFileList) {
+    for (Entry<String, Set<File>> entry : sendingFileList.entrySet()) {
+      if (!entry.getValue().isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
new file mode 100644
index 0000000..3fe2e33
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SyncSenderLoggerTest {
+
+  private SyncSenderLogger senderLogger;
+  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private String dataDir;
+
+  @Before
+  public void setUp()
+      throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException {
+    EnvironmentUtils.envSetUp();
+    dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+        .getParentFile().getAbsolutePath();
+    config.update(dataDir);
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testSyncSenderLogger() throws IOException {
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    Set<String> deletedFileNames = new HashSet<>();
+    Set<String> deletedFileNamesTest = new HashSet<>();
+    senderLogger.startSyncDeletedFilesName();
+    for (int i = 0; i < 100; i++) {
+      senderLogger
+          .finishSyncDeletedFileName(new File(config.getSenderFolderPath(), "deleted" + i));
+      deletedFileNames
+          .add(new File(config.getSenderFolderPath(), "deleted" + i).getAbsolutePath());
+    }
+    Set<String> toBeSyncedFiles = new HashSet<>();
+    Set<String> toBeSyncedFilesTest = new HashSet<>();
+    senderLogger.startSyncTsFiles();
+    for (int i = 0; i < 100; i++) {
+      senderLogger
+          .finishSyncTsfile(new File(config.getSenderFolderPath(), "new" + i));
+      toBeSyncedFiles
+          .add(new File(config.getSenderFolderPath(), "new" + i).getAbsolutePath());
+    }
+    int count = 0;
+    int mode = 0;
+    try (BufferedReader br = new BufferedReader(
+        new FileReader(new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        count++;
+        if(line.equals(SyncSenderLogger.SYNC_DELETED_FILE_NAME_START)){
+          mode = -1;
+        } else if(line.equals(SyncSenderLogger.SYNC_TSFILE_START)){
+          mode = 1;
+        } else {
+          if(mode == -1){
+            deletedFileNamesTest.add(line);
+          } else if(mode == 1){
+            toBeSyncedFilesTest.add(line);
+          }
+        }
+      }
+    }
+    assert count == 202;
+    assert deletedFileNames.size() == deletedFileNamesTest.size();
+    assert toBeSyncedFiles.size() == toBeSyncedFilesTest.size();
+    assert deletedFileNames.containsAll(deletedFileNamesTest);
+    assert toBeSyncedFiles.containsAll(toBeSyncedFilesTest);
+  }
+}
\ No newline at end of file


[incubator-iotdb] 02/03: complete load tsfiles module

Posted by lt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 1785cab5b4244c5251a9b43113b064e88913c5dd
Author: lta <li...@163.com>
AuthorDate: Mon Sep 2 17:22:39 2019 +0800

    complete load tsfiles module
---
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  11 +
 .../iotdb/db/engine/merge/task/MergeFileTask.java  |  23 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 247 +++++++++++++++++++--
 .../db/engine/storagegroup/TsFileResource.java     |   4 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    |  43 +++-
 .../sender/recover/ISyncSenderLogAnalyzer.java     |   3 +-
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   7 +-
 .../sync/sender/transfer/DataTransferManager.java  |   2 +-
 .../sender/recover/SyncSenderLogAnalyzerTest.java  |  25 +++
 9 files changed, 338 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 3e36516..c96ef2e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.PathErrorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.StorageEngineFailureException;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.qp.physical.crud.BatchInsertPlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
@@ -337,4 +338,14 @@ public class StorageEngine implements IService {
     return true;
   }
 
+
+  public void loadNewTsFile(File newTsFile, TsFileResource resource)
+      throws TsFileProcessorException {
+    processorMap.get(newTsFile.getParentFile().getName()).loadNewTsFile(newTsFile, resource);
+  }
+
+  public void deleteTsfile(File deletedTsfile){
+    processorMap.get(deletedTsfile.getParentFile().getName()).deleteTsfile(deletedTsfile);
+  }
+
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index cf1f62d..68d3141 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -19,6 +19,9 @@
 
 package org.apache.iotdb.db.engine.merge.task;
 
+import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
+
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -148,6 +151,12 @@ class MergeFileTask {
       logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
 
       newFileWriter.getFile().delete();
+
+      File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+      FileUtils.moveFile(seqFile.getFile(), nextMergeVersionFile);
+      FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+          new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+      seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
     }
@@ -209,12 +218,24 @@ class MergeFileTask {
       TsFileMetaDataCache.getInstance().remove(seqFile);
       DeviceMetaDataCache.getInstance().remove(seqFile);
       seqFile.getFile().delete();
-      FileUtils.moveFile(fileWriter.getFile(), seqFile.getFile());
+
+      File nextMergeVersionFile = getNextMergeVersionFile(seqFile.getFile());
+      FileUtils.moveFile(fileWriter.getFile(), nextMergeVersionFile);
+      FileUtils.moveFile(new File(seqFile.getFile(), TsFileResource.RESOURCE_SUFFIX),
+          new File(nextMergeVersionFile, TsFileResource.RESOURCE_SUFFIX));
+      seqFile.setFile(nextMergeVersionFile);
     } finally {
       seqFile.getMergeQueryLock().writeLock().unlock();
     }
   }
 
+  private File getNextMergeVersionFile(File seqFile) {
+    String[] splits = seqFile.getName().replace(TSFILE_SUFFIX, "").split("-");
+    int mergeVersion = Integer.parseInt(splits[2]) + 1;
+    return new File(seqFile.getParentFile(),
+        splits[0] + "-" + splits[1] + "-" + mergeVersion + TSFILE_SUFFIX);
+  }
+
   private long writeUnmergedChunks(List<Long> chunkStartTimes,
       List<ChunkMetaData> chunkMetaDataList, TsFileSequenceReader reader,
       RestorableTsFileIOWriter fileWriter) throws IOException {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 1e41c57..4da8e30 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -38,6 +38,7 @@ import java.util.Set;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
@@ -67,17 +68,17 @@ import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
 import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
 import org.apache.iotdb.db.query.context.QueryContext;
 import org.apache.iotdb.db.query.control.JobFileManager;
-import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.db.utils.CopyOnReadLinkedList;
 import org.apache.iotdb.db.writelog.recover.TsFileRecoverPerformer;
+import org.apache.iotdb.rpc.TSStatusType;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.utils.Pair;
-import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -206,7 +207,8 @@ public class StorageGroupProcessor {
 
     try {
       // collect TsFiles from sequential and unsequential data directory
-      List<TsFileResource> seqTsFiles = getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+      List<TsFileResource> seqTsFiles = getAllFiles(
+          DirectoryManager.getInstance().getAllSequenceFileFolders());
       List<TsFileResource> unseqTsFiles =
           getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
 
@@ -222,7 +224,8 @@ public class StorageGroupProcessor {
           storageGroupSysDir.getPath(), this::mergeEndAction, taskName,
           IoTDBDescriptor.getInstance().getConfig().isForceFullMerge(), storageGroupName);
       logger.info("{} a RecoverMergeTask {} starts...", storageGroupName, taskName);
-      recoverMergeTask.recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
+      recoverMergeTask
+          .recoverMerge(IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot());
       if (!IoTDBDescriptor.getInstance().getConfig().isContinueMergeAfterReboot()) {
         mergingMods.delete();
       }
@@ -297,7 +300,7 @@ public class StorageGroupProcessor {
   }
 
   // TsFileNameComparator compares TsFiles by the version number in its name
-  // ({systemTime}-{versionNum}.tsfile)
+  // ({systemTime}-{versionNum}-{mergeNum}.tsfile)
   public int compareFileName(File o1, File o2) {
     String[] items1 = o1.getName().replace(TSFILE_SUFFIX, "").split("-");
     String[] items2 = o2.getName().replace(TSFILE_SUFFIX, "").split("-");
@@ -397,7 +400,8 @@ public class StorageGroupProcessor {
     boolean result = tsFileProcessor.insertBatch(batchInsertPlan, indexes, results);
 
     // try to update the latest time of the device of this tsRecord
-    if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan.getMaxTime()) {
+    if (result && latestTimeForEachDevice.get(batchInsertPlan.getDeviceId()) < batchInsertPlan
+        .getMaxTime()) {
       latestTimeForEachDevice.put(batchInsertPlan.getDeviceId(), batchInsertPlan.getMaxTime());
     }
 
@@ -472,8 +476,9 @@ public class StorageGroupProcessor {
           e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     } catch (IOException e) {
-      logger.error("meet IOException when creating TsFileProcessor, change system mode to read-only",
-          e);
+      logger
+          .error("meet IOException when creating TsFileProcessor, change system mode to read-only",
+              e);
       IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     }
     return tsFileProcessor;
@@ -490,7 +495,7 @@ public class StorageGroupProcessor {
     new File(baseDir, storageGroupName).mkdirs();
 
     String filePath = Paths.get(baseDir, storageGroupName,
-        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString()
+        System.currentTimeMillis() + "-" + versionController.nextVersion()).toString() + "-0"
         + TSFILE_SUFFIX;
 
     if (sequence) {
@@ -552,7 +557,7 @@ public class StorageGroupProcessor {
       this.latestFlushedTimeForEachDevice.clear();
       this.latestTimeForEachDevice.clear();
     } catch (IOException e) {
-      logger.error("Cannot delete files in storage group {}, because", storageGroupName, e);
+      logger.error("Cannot delete files in storage group {}", storageGroupName, e);
     } finally {
       writeUnlock();
     }
@@ -608,7 +613,8 @@ public class StorageGroupProcessor {
           deviceId, measurementId, context);
       List<TsFileResource> unseqResources = getFileReSourceListForQuery(unSequenceFileList,
           deviceId, measurementId, context);
-      QueryDataSource dataSource =  new QueryDataSource(new Path(deviceId, measurementId), seqResources, unseqResources);
+      QueryDataSource dataSource = new QueryDataSource(new Path(deviceId, measurementId),
+          seqResources, unseqResources);
       // used files should be added before mergeLock is unlocked, or they may be deleted by
       // running merge
       // is null only in tests
@@ -855,8 +861,10 @@ public class StorageGroupProcessor {
         mergeResource.setCacheDeviceMeta(true);
 
         MergeTask mergeTask = new MergeTask(mergeResource, storageGroupSysDir.getPath(),
-            this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(), storageGroupName);
-        mergingModification = new ModificationFile(storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
+            this::mergeEndAction, taskName, fullMerge, fileSelector.getConcurrentMergeNum(),
+            storageGroupName);
+        mergingModification = new ModificationFile(
+            storageGroupSysDir + File.separator + MERGING_MODIFICAITON_FILE_NAME);
         MergeManager.getINSTANCE().submitMainTask(mergeTask);
         if (logger.isInfoEnabled()) {
           logger.info("{} submits a merge task {}, merging {} seqFiles, {} unseqFiles",
@@ -950,6 +958,219 @@ public class StorageGroupProcessor {
     logger.info("{} a merge task ends", storageGroupName);
   }
 
+  /**
+   * Load a new tsfile to storage group processor
+   *
+   * Firstly, determine the loading type of the file, whether it needs to be loaded in sequence list
+   * or unsequence list.
+   *
+   * Secondly, execute the loading process by the type.
+   *
+   * Finally, update the latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+   *
+   * @param newTsFile new tsfile
+   * @param newTsFileResource tsfile resource
+   * @UsedBy sync module.
+   */
+  public void loadNewTsFile(File newTsFile, TsFileResource newTsFileResource)
+      throws TsFileProcessorException {
+    writeLock();
+    mergeLock.writeLock().lock();
+    try {
+      boolean isOverlap = false;
+      int preIndex = -1, subsequentIndex = sequenceFileList.size();
+      // check new tsfile
+      outer:
+      for (int i = 0; i < sequenceFileList.size(); i++) {
+        if (i == sequenceFileList.size() - 1 && sequenceFileList.get(i).getEndTimeMap().isEmpty()) {
+          continue;
+        }
+        int preCnt = 0, subsequenceCnt = 0;
+        for (String device : newTsFileResource.getStartTimeMap().keySet()) {
+          if (sequenceFileList.get(i).getStartTimeMap().containsKey(device)) {
+            long startTime1 = sequenceFileList.get(i).getStartTimeMap().get(device);
+            long endTime1 = sequenceFileList.get(i).getEndTimeMap().get(device);
+            long startTime2 = newTsFileResource.getStartTimeMap().get(device);
+            long endTime2 = newTsFileResource.getEndTimeMap().get(device);
+            if (startTime1 > endTime2) {
+              subsequenceCnt++;
+            } else if (startTime2 > endTime1) {
+              preCnt++;
+            } else {
+              isOverlap = true;
+              break outer;
+            }
+          }
+        }
+        if (preCnt != 0 && subsequenceCnt != 0) {
+          isOverlap = true;
+          break;
+        }
+        if (preCnt == 0 && subsequenceCnt != 0) {
+          subsequentIndex = i;
+          break;
+        }
+        if (preCnt != 0 && subsequenceCnt == 0) {
+          preIndex = i;
+        }
+      }
+
+      // loading tsfile by type
+      if (isOverlap) {
+        loadTsFileByType(-1, newTsFile, newTsFileResource, unSequenceFileList.size());
+      } else {
+        if (subsequentIndex != sequenceFileList.size()) {
+          loadTsFileByType(1, newTsFile, newTsFileResource, subsequentIndex);
+        } else {
+          if (preIndex != -1) {
+            loadTsFileByType(1, newTsFile, newTsFileResource, preIndex + 1);
+          } else {
+            loadTsFileByType(1, newTsFile, newTsFileResource, sequenceFileList.size());
+          }
+        }
+      }
+
+      // update latest time map
+      updateLatestTimeMap(newTsFileResource);
+    } catch (TsFileProcessorException e) {
+      logger.error("Failed to append the tsfile {} to storage group processor {}.",
+          newTsFile.getAbsolutePath(), newTsFile.getParentFile().getName());
+      throw new TsFileProcessorException(e);
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+  }
+
+  /**
+   * Update latest time in latestTimeForEachDevice and latestFlushedTimeForEachDevice.
+   *
+   * @UsedBy sync module
+   */
+  private void updateLatestTimeMap(TsFileResource newTsFileResource) {
+    for (Entry<String, Long> entry : newTsFileResource.getEndTimeMap().entrySet()) {
+      String device = entry.getKey();
+      long endTime = newTsFileResource.getEndTimeMap().get(device);
+      if(!latestTimeForEachDevice.containsKey(device) || latestTimeForEachDevice.get(device) < endTime){
+        latestTimeForEachDevice.put(device, endTime);
+      }
+      if(!latestFlushedTimeForEachDevice.containsKey(device) || latestFlushedTimeForEachDevice.get(device) < endTime){
+        latestFlushedTimeForEachDevice.put(device, endTime);
+      }
+    }
+  }
+
+  /**
+   * Execute the loading process by the type.
+   *
+   * @param type load type: 1 sequence tsfile ; 2 unsequence tsfile
+   * @param tsFile tsfile to be loaded
+   * @param tsFileResource tsfile resource to be loaded
+   * @param index the index in sequenceFileList/unSequenceFileList
+   * @UsedBy sync module
+   */
+  private void loadTsFileByType(int type, File tsFile, TsFileResource tsFileResource, int index)
+      throws TsFileProcessorException {
+    File targetFile;
+    if (type == -1) {
+      targetFile =
+          new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+              .getParentFile(), IoTDBConstant.UNSEQUENCE_FLODER_NAME
+              + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+              + tsFile.getName());
+      tsFileResource.setFile(targetFile);
+      unSequenceFileList.add(index, tsFileResource);
+    } else {
+      targetFile =
+          new File(tsFile.getParentFile().getParentFile().getParentFile().getParentFile()
+              .getParentFile(), IoTDBConstant.SEQUENCE_FLODER_NAME
+              + File.separatorChar + tsFile.getParentFile().getName() + File.separatorChar
+              + tsFile.getName());
+      tsFileResource.setFile(targetFile);
+      sequenceFileList.add(index, tsFileResource);
+    }
+
+    // move file from sync dir to data dir
+    if (!targetFile.getParentFile().exists()) {
+      targetFile.getParentFile().mkdirs();
+    }
+    if (!new File(tsFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+        targetFile, TsFileResource.RESOURCE_SUFFIX).exists()) {
+      throw new TsFileProcessorException(
+          String
+              .format("The new .resource file {%s} to be loaded does not exist.",
+                  tsFile.getAbsolutePath()));
+    }
+    if (!new File(targetFile, TsFileResource.RESOURCE_SUFFIX).exists() && !new File(
+        tsFile, TsFileResource.RESOURCE_SUFFIX)
+        .renameTo(new File(targetFile, TsFileResource.RESOURCE_SUFFIX))) {
+      throw new TsFileProcessorException(String.format(
+          "File renaming failed when loading .resource file. Origin: %s, Target: %s",
+          new File(tsFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath(),
+          new File(targetFile, TsFileResource.RESOURCE_SUFFIX).getAbsolutePath()));
+    }
+    if (!tsFile.exists() && !targetFile.exists()) {
+      throw new TsFileProcessorException(String
+          .format("The new tsfile {%s} to be loaded does not exist.",
+              tsFile.getAbsolutePath()));
+    }
+    if (!targetFile.exists() && !tsFile.renameTo(targetFile)) {
+      throw new TsFileProcessorException(String.format(
+          "File renaming failed when loading tsfile. Origin: %s, Target: %s",
+          tsFile.getAbsolutePath(), targetFile.getAbsolutePath()));
+    }
+  }
+
+  /**
+   * Delete tsfile if it exists which.
+   *
+   * Firstly, remove the TsFileResource from sequenceFileList/unSequenceFileList.
+   *
+   * Secondly, delete the tsfile and .resource file.
+   *
+   * @param deletedTsfile tsfile to be deleted
+   * @UsedBy sync module.
+   */
+  public void deleteTsfile(File deletedTsfile) {
+    writeLock();
+    mergeLock.writeLock().lock();
+    TsFileResource deletedTsFileResource = null;
+    try {
+      Iterator<TsFileResource> sequenceIterator = sequenceFileList.iterator();
+      while (sequenceIterator.hasNext()) {
+        TsFileResource sequenceResource = sequenceIterator.next();
+        if (sequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+          deletedTsFileResource = sequenceResource;
+          sequenceIterator.remove();
+          break;
+        }
+      }
+      if(deletedTsFileResource == null) {
+        Iterator<TsFileResource> unsequenceIterator = unSequenceFileList.iterator();
+        while (unsequenceIterator.hasNext()) {
+          TsFileResource unsequenceResource = unsequenceIterator.next();
+          if (unsequenceResource.getFile().getName().equals(deletedTsfile.getName())) {
+            deletedTsFileResource = unsequenceResource;
+            unsequenceIterator.remove();
+            break;
+          }
+        }
+      }
+    } finally {
+      mergeLock.writeLock().unlock();
+      writeUnlock();
+    }
+    if(deletedTsFileResource == null){
+      return;
+    }
+    deletedTsFileResource.getMergeQueryLock().writeLock().lock();
+    try {
+      deletedTsFileResource.getFile().delete();
+      new File(deletedTsFileResource.getFile(), TsFileResource.RESOURCE_SUFFIX).delete();
+    } finally {
+      deletedTsFileResource.getMergeQueryLock().writeLock().unlock();
+    }
+  }
 
   public TsFileProcessor getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 3a7af7e..1aa2136 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -189,6 +189,10 @@ public class TsFileResource {
     return modFile;
   }
 
+  public void setFile(File file) {
+    this.file = file;
+  }
+
   public boolean containsDevice(String deviceId) {
     return startTimeMap.containsKey(deviceId);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 19f7909..6bd7d49 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.sync.receiver.load;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayDeque;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.TsFileProcessorException;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,10 +79,15 @@ public class FileLoader implements IFileLoader {
           }
         }
         if (!queue.isEmpty()) {
-          handleLoadTask(queue.poll());
+          LoadTask task = queue.poll();
+          try {
+            handleLoadTask(task);
+          }catch (IOException e){
+            LOGGER.error("Can not load task {}", task, e);
+          }
         }
       }
-    } catch (InterruptedException | IOException e) {
+    } catch (InterruptedException e) {
       LOGGER.error("Can not handle load task", e);
     }
   };
@@ -111,32 +119,39 @@ public class FileLoader implements IFileLoader {
   public void handleLoadTask(LoadTask task) throws IOException {
     switch (task.type) {
       case ADD:
-        loadDeletedFile(task.file);
+        loadNewTsfile(task.file);
         break;
       case DELETE:
-        loadNewTsfile(task.file);
+        loadDeletedFile(task.file);
         break;
       default:
         LOGGER.error("Wrong load task type {}", task.type);
     }
   }
 
-  private void loadDeletedFile(File file) throws IOException {
+  private void loadNewTsfile(File newTsFile) throws IOException {
     if (curType != LoadType.DELETE) {
       loadLog.startLoadDeletedFiles();
       curType = LoadType.DELETE;
     }
-    // TODO load deleted file
-    loadLog.finishLoadDeletedFile(file);
+    TsFileResource tsFileResource = new TsFileResource(
+        new File(newTsFile, TsFileResource.RESOURCE_SUFFIX));
+    tsFileResource.deSerialize();
+    try {
+      StorageEngine.getInstance().loadNewTsFile(newTsFile, tsFileResource);
+    } catch (TsFileProcessorException e) {
+      LOGGER.error("Can not load new tsfile {}", newTsFile.getAbsolutePath(), e);
+    }
+    loadLog.finishLoadDeletedFile(newTsFile);
   }
 
-  private void loadNewTsfile(File file) throws IOException {
+  private void loadDeletedFile(File deletedTsFile) throws IOException {
     if (curType != LoadType.ADD) {
       loadLog.startLoadTsFiles();
       curType = LoadType.ADD;
     }
-    // TODO load new tsfile
-    loadLog.finishLoadTsfile(file);
+    StorageEngine.getInstance().deleteTsfile(deletedTsFile);
+    loadLog.finishLoadTsfile(deletedTsFile);
   }
 
 
@@ -165,5 +180,13 @@ public class FileLoader implements IFileLoader {
       this.file = file;
       this.type = type;
     }
+
+    @Override
+    public String toString() {
+      return "LoadTask{" +
+          "file=" + file +
+          ", type=" + type +
+          '}';
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
index d0c09b1..3d7a356 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/ISyncSenderLogAnalyzer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.sync.sender.recover;
 
+import java.io.IOException;
 import java.util.Set;
 
 /**
@@ -27,7 +28,7 @@ import java.util.Set;
  */
 public interface ISyncSenderLogAnalyzer {
 
-  void recover();
+  void recover() throws IOException;
 
   void loadLastLocalFiles(Set<String> lastLocalFiles);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index af4f00c..6eaec50 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -26,6 +26,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.Set;
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,18 +34,20 @@ import org.slf4j.LoggerFactory;
 public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzer.class);
+  private String senderPath;
   private File currentLocalFile;
   private File lastLocalFile;
   private File syncLogFile;
 
   public SyncSenderLogAnalyzer(String senderPath) {
+    this.senderPath = senderPath;
     this.currentLocalFile = new File(senderPath, Constans.CURRENT_LOCAL_FILE_NAME);
     this.lastLocalFile = new File(senderPath, Constans.LAST_LOCAL_FILE_NAME);
     this.syncLogFile = new File(senderPath, Constans.SYNC_LOG_NAME);
   }
 
   @Override
-  public void recover() {
+  public void recover() throws IOException {
     if (currentLocalFile.exists() && !lastLocalFile.exists()) {
       currentLocalFile.renameTo(lastLocalFile);
     } else {
@@ -57,6 +60,8 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
       lastLocalFiles.addAll(newFiles);
       clearLogger(lastLocalFiles);
     }
+    FileUtils.deleteDirectory(new File(senderPath, Constans.DATA_SNAPSHOT_NAME));
+    syncLogFile.delete();
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index 4969e19..6331d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -257,7 +257,7 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
-  private void checkRecovery() {
+  private void checkRecovery() throws IOException {
     new SyncSenderLogAnalyzer(config.getSenderFolderPath()).recover();
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
index 6b65776..c30eb8f 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -100,6 +100,31 @@ public class SyncSenderLogAnalyzerTest {
       assert lastFilesMap.get(entry.getKey()).size() == entry.getValue().size();
       assert lastFilesMap.get(entry.getKey()).containsAll(entry.getValue());
     }
+
+    // delete some files
+    assert !new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME).exists();
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    manager.getValidFiles(dataDir);
+    assert !isEmpty(manager.getLastLocalFilesMap());
+    senderLogger.startSyncDeletedFilesName();
+    for(Set<File> newTsFiles:allFileList.values()){
+      for(File file: newTsFiles){
+        senderLogger.finishSyncDeletedFileName(file);
+      }
+    }
+    senderLogger.close();
+    // recover log
+    senderLogAnalyzer.recover();
+    manager.getValidFiles(dataDir);
+    assert isEmpty(manager.getLastLocalFilesMap());
+    assert isEmpty(manager.getDeletedFilesMap());
+    Map<String, Set<File>> toBeSyncedFilesMap = manager.getToBeSyncedFilesMap();
+    for (Entry<String, Set<File>> entry : allFileList.entrySet()) {
+      assert toBeSyncedFilesMap.containsKey(entry.getKey());
+      assert toBeSyncedFilesMap.get(entry.getKey()).size() == entry.getValue().size();
+      assert toBeSyncedFilesMap.get(entry.getKey()).containsAll(entry.getValue());
+    }
   }
 
   private boolean isEmpty(Map<String, Set<File>> sendingFileList) {