You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/09/02 09:50:11 UTC

[incubator-iotdb] 01/03: add sync sender log analyzer

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit a81f28010c9e494f81f3924a198ed9e161ae7fc8
Author: lta <li...@163.com>
AuthorDate: Fri Aug 30 21:23:36 2019 +0800

    add sync sender log analyzer
---
 .../sync/sender/recover/SyncSenderLogAnalyzer.java |   3 +
 .../db/sync/sender/manage/SyncFileManagerTest.java |  18 ++++
 .../sender/recover/SyncSenderLogAnalyzerTest.java  | 114 +++++++++++++++++++++
 .../sync/sender/recover/SyncSenderLoggerTest.java  | 107 +++++++++++++++++++
 4 files changed, 242 insertions(+)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
index 60d45ed..af4f00c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzer.java
@@ -61,6 +61,9 @@ public class SyncSenderLogAnalyzer implements ISyncSenderLogAnalyzer {
 
   @Override
   public void loadLastLocalFiles(Set<String> lastLocalFiles) {
+    if (!lastLocalFile.exists()) {
+      return;
+    }
     try (BufferedReader br = new BufferedReader(new FileReader(lastLocalFile))) {
       String line;
       while ((line = br.readLine()) != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
index 91e59d3..46f339c 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/manage/SyncFileManagerTest.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.sender.manage;
 
 import java.io.BufferedWriter;
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
new file mode 100644
index 0000000..6b65776
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLogAnalyzerTest.java
@@ -0,0 +1,114 @@
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.sync.sender.manage.SyncFileManager;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncSenderLogAnalyzerTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncSenderLogAnalyzerTest.class);
+  private SyncSenderLogger senderLogger;
+  private SyncSenderLogAnalyzer senderLogAnalyzer;
+  private SyncFileManager manager = SyncFileManager.getInstance();
+  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private String dataDir;
+
+  @Before
+  public void setUp()
+      throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException {
+    EnvironmentUtils.envSetUp();
+    dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+        .getParentFile().getAbsolutePath();
+    config.update(dataDir);
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    senderLogAnalyzer = new SyncSenderLogAnalyzer(config.getSenderFolderPath());
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void recover() throws IOException {
+    Map<String, Set<File>> allFileList = new HashMap<>();
+
+    Random r = new Random(0);
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 5; j++) {
+        if (!allFileList.containsKey(String.valueOf(i))) {
+          allFileList.put(String.valueOf(i), new HashSet<>());
+        }
+        String rand = String.valueOf(r.nextInt(10000));
+        String fileName = FilePathUtils.regularizePath(dataDir) + IoTDBConstant.SEQUENCE_FLODER_NAME
+            + File.separator + i
+            + File.separator + rand;
+        File file = new File(fileName);
+        allFileList.get(String.valueOf(i)).add(file);
+        if (!file.getParentFile().exists()) {
+          file.getParentFile().mkdirs();
+        }
+        if (!file.exists() && !file.createNewFile()) {
+          LOGGER.error("Can not create new file {}", file.getPath());
+        }
+        if (!new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()
+            && !new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).createNewFile()) {
+          LOGGER.error("Can not create new file {}", file.getPath());
+        }
+      }
+    }
+    manager.getValidFiles(dataDir);
+    assert isEmpty(manager.getLastLocalFilesMap());
+    senderLogger.startSyncTsFiles();
+    for(Set<File> newTsFiles:allFileList.values()){
+      for(File file: newTsFiles){
+        senderLogger.finishSyncTsfile(file);
+      }
+    }
+    senderLogger.close();
+
+    // recover log
+    senderLogAnalyzer.recover();
+    manager.getValidFiles(dataDir);
+    assert !isEmpty(manager.getLastLocalFilesMap());
+    Map<String, Set<File>> lastFilesMap = manager.getLastLocalFilesMap();
+    for (Entry<String, Set<File>> entry : allFileList.entrySet()) {
+      assert lastFilesMap.containsKey(entry.getKey());
+      assert lastFilesMap.get(entry.getKey()).size() == entry.getValue().size();
+      assert lastFilesMap.get(entry.getKey()).containsAll(entry.getValue());
+    }
+  }
+
+  private boolean isEmpty(Map<String, Set<File>> sendingFileList) {
+    for (Entry<String, Set<File>> entry : sendingFileList.entrySet()) {
+      if (!entry.getValue().isEmpty()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
new file mode 100644
index 0000000..3fe2e33
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/sync/sender/recover/SyncSenderLoggerTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.sender.recover;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.StartupException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderConfig;
+import org.apache.iotdb.db.sync.sender.conf.SyncSenderDescriptor;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SyncSenderLoggerTest {
+
+  private SyncSenderLogger senderLogger;
+  private SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig();
+  private String dataDir;
+
+  @Before
+  public void setUp()
+      throws IOException, InterruptedException, StartupException, DiskSpaceInsufficientException {
+    EnvironmentUtils.envSetUp();
+    dataDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile())
+        .getParentFile().getAbsolutePath();
+    config.update(dataDir);
+  }
+
+  @After
+  public void tearDown() throws InterruptedException, IOException, StorageEngineException {
+    EnvironmentUtils.cleanEnv();
+  }
+
+  @Test
+  public void testSyncSenderLogger() throws IOException {
+    senderLogger = new SyncSenderLogger(
+        new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME));
+    Set<String> deletedFileNames = new HashSet<>();
+    Set<String> deletedFileNamesTest = new HashSet<>();
+    senderLogger.startSyncDeletedFilesName();
+    for (int i = 0; i < 100; i++) {
+      senderLogger
+          .finishSyncDeletedFileName(new File(config.getSenderFolderPath(), "deleted" + i));
+      deletedFileNames
+          .add(new File(config.getSenderFolderPath(), "deleted" + i).getAbsolutePath());
+    }
+    Set<String> toBeSyncedFiles = new HashSet<>();
+    Set<String> toBeSyncedFilesTest = new HashSet<>();
+    senderLogger.startSyncTsFiles();
+    for (int i = 0; i < 100; i++) {
+      senderLogger
+          .finishSyncTsfile(new File(config.getSenderFolderPath(), "new" + i));
+      toBeSyncedFiles
+          .add(new File(config.getSenderFolderPath(), "new" + i).getAbsolutePath());
+    }
+    int count = 0;
+    int mode = 0;
+    try (BufferedReader br = new BufferedReader(
+        new FileReader(new File(config.getSenderFolderPath(), Constans.SYNC_LOG_NAME)))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        count++;
+        if(line.equals(SyncSenderLogger.SYNC_DELETED_FILE_NAME_START)){
+          mode = -1;
+        } else if(line.equals(SyncSenderLogger.SYNC_TSFILE_START)){
+          mode = 1;
+        } else {
+          if(mode == -1){
+            deletedFileNamesTest.add(line);
+          } else if(mode == 1){
+            toBeSyncedFilesTest.add(line);
+          }
+        }
+      }
+    }
+    assert count == 202;
+    assert deletedFileNames.size() == deletedFileNamesTest.size();
+    assert toBeSyncedFiles.size() == toBeSyncedFilesTest.size();
+    assert deletedFileNames.containsAll(deletedFileNamesTest);
+    assert toBeSyncedFiles.containsAll(toBeSyncedFilesTest);
+  }
+}
\ No newline at end of file