You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/30 12:38:01 UTC

[incubator-iotdb] 02/03: add sync log recovery in receiver end

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 8b5123486fb2ac84ce230134c144e066c734c006
Author: lta <li...@163.com>
AuthorDate: Fri Aug 30 16:20:22 2019 +0800

    add sync log recovery in receiver end
---
 .../iotdb/db/sync/receiver/SyncServerManager.java  |   7 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    |  26 ++--
 .../db/sync/receiver/load/FileLoaderManager.java   |   4 +
 .../iotdb/db/sync/receiver/load/ILoadLogger.java   |  18 +++
 .../iotdb/db/sync/receiver/load/LoadLogger.java    |  23 +++-
 .../LoadType.java}                                 |  11 +-
 .../receiver/recover/ISyncReceiverLogAnalyzer.java |  11 +-
 .../receiver/recover/SyncReceiverLogAnalyzer.java  | 152 +++++++++++++++++++++
 .../sync/receiver/recover/SyncReceiverLogger.java  |   5 +-
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  23 ++--
 .../apache/iotdb/db/sync/sender/conf/Constans.java |   2 -
 11 files changed, 245 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index faba821..c6e06cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.sync.receiver;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
@@ -28,6 +29,7 @@ import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
 import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
 import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
@@ -67,6 +69,11 @@ public class SyncServerManager implements IService {
       return;
     }
     FileLoaderManager.getInstance().start();
+    try {
+      SyncReceiverLogAnalyzer.getInstance().recoverAll();
+    } catch (IOException e) {
+      logger.error("Can not recover receiver sync state", e);
+    }
     if (conf.getIpWhiteList() == null) {
       logger.error(
           "Sync server failed to start because IP white list is null, please set IP white list.");
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index 8db09c0..d834487 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -56,14 +56,19 @@ public class FileLoader implements IFileLoader {
     return new FileLoader(senderName, syncFolderPath);
   }
 
+  public static FileLoader createFileLoader(File syncFolder)
+      throws IOException {
+    return new FileLoader(syncFolder.getName(), syncFolder.getAbsolutePath());
+  }
+
   private Runnable loadTaskRunner = () -> {
     try {
       while (true) {
-        if (endSync) {
-          cleanUp();
-          break;
-        }
         if (queue.isEmpty()) {
+          if (endSync) {
+            cleanUp();
+            break;
+          }
           synchronized (queue) {
             if (queue.isEmpty()) {
               queue.wait(WAIT_TIME);
@@ -97,7 +102,9 @@ public class FileLoader implements IFileLoader {
 
   @Override
   public void endSync() {
-    this.endSync = true;
+    if (!endSync && FileLoaderManager.getInstance().containsFileLoader(senderName)) {
+      this.endSync = true;
+    }
   }
 
   @Override
@@ -139,13 +146,16 @@ public class FileLoader implements IFileLoader {
       loadLog.close();
       new File(syncFolderPath, Constans.SYNC_LOG_NAME).deleteOnExit();
       new File(syncFolderPath, Constans.LOAD_LOG_NAME).deleteOnExit();
-      new File(syncFolderPath, Constans.SYNC_END).deleteOnExit();
       FileLoaderManager.getInstance().removeFileLoader(senderName);
     } catch (IOException e) {
       LOGGER.error("Can not clean up sync resource.", e);
     }
   }
 
+  public void setCurType(LoadType curType) {
+    this.curType = curType;
+  }
+
   class LoadTask {
 
     private File file;
@@ -156,8 +166,4 @@ public class FileLoader implements IFileLoader {
       this.type = type;
     }
   }
-
-  private enum LoadType {
-    DELETE, ADD, NONE
-  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
index 2b86777..8fc341a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -55,6 +55,10 @@ public class FileLoaderManager {
     return fileLoaderMap.get(senderName);
   }
 
+  public boolean containsFileLoader(String senderName){
+    return fileLoaderMap.containsKey(senderName);
+  }
+
   public void addLoadTaskRunner(Runnable taskRunner){
     loadTaskRunnerPool.submit(taskRunner);
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
index 2cad379..15fd8f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/ILoadLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
index 31b4c48..881a637 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.BufferedWriter;
@@ -45,6 +63,9 @@ public class LoadLogger implements ILoadLogger {
 
   @Override
   public void close() throws IOException {
-    bw.close();
+    if(bw != null) {
+      bw.close();
+      bw = null;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
similarity index 82%
copy from server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
copy to server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
index 8015fb2..4742093 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/LoadType.java
@@ -16,13 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.iotdb.db.sync.receiver.recover;
+package org.apache.iotdb.db.sync.receiver.load;
 
-public interface ISyncReceiverLogAnalyzer {
-
-  void recover();
-
-  void scanLogger(String path);
-
-  void clearLogger();
+public enum LoadType {
+    DELETE, ADD, NONE
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
index 8015fb2..a470de3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/ISyncReceiverLogAnalyzer.java
@@ -18,11 +18,16 @@
  */
 package org.apache.iotdb.db.sync.receiver.recover;
 
+import java.io.File;
+import java.io.IOException;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+
 public interface ISyncReceiverLogAnalyzer {
 
-  void recover();
+  void recoverAll() throws IOException;
+
+  boolean recover(String senderName) throws IOException;
 
-  void scanLogger(String path);
+  void scanLogger(FileLoader loader, File syncLog, File loadLog);
 
-  void clearLogger();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
new file mode 100644
index 0000000..4ed7284
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogAnalyzer.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.recover;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.load.LoadLogger;
+import org.apache.iotdb.db.sync.receiver.load.LoadType;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.apache.iotdb.db.utils.FilePathUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SyncReceiverLogAnalyzer implements ISyncReceiverLogAnalyzer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SyncReceiverLogAnalyzer.class);
+
+  private static final int WAIT_TIMEOUT = 2000;
+
+  private SyncReceiverLogAnalyzer() {
+
+  }
+
+  public static SyncReceiverLogAnalyzer getInstance() {
+    return SyncReceiverLogAnalyzerHolder.INSTANCE;
+  }
+
+  @Override
+  public void recoverAll() throws IOException {
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    LOGGER.info("Start to recover all sync state for sync receiver.");
+    for (String dataDir : dataDirs) {
+      if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) {
+        continue;
+      }
+      for (File syncFolder : new File(
+          FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER)
+          .listFiles()) {
+        recover(syncFolder);
+      }
+    }
+    LOGGER.info("Finish to recover all sync state for sync receiver.");
+  }
+
+  private boolean recover(File senderFolder) throws IOException {
+    // check the state
+    if (!new File(senderFolder, Constans.SYNC_LOG_NAME).exists()) {
+      new File(senderFolder, Constans.LOAD_LOG_NAME).deleteOnExit();
+      return true;
+    }
+    if (FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName())) {
+      FileLoaderManager.getInstance().getFileLoader(senderFolder.getName()).endSync();
+      try {
+        Thread.sleep(WAIT_TIMEOUT);  // wait for file loader to clean up resource
+      } catch (InterruptedException e) {
+        LOGGER.error("Can not wait for recovery to complete.", e);
+      }
+    } else {
+      scanLogger(FileLoader.createFileLoader(senderFolder),
+          new File(senderFolder, Constans.SYNC_LOG_NAME),
+          new File(senderFolder, Constans.LOAD_LOG_NAME));
+    }
+    return FileLoaderManager.getInstance().containsFileLoader(senderFolder.getName());
+  }
+
+  @Override
+  public boolean recover(String senderName) throws IOException {
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    boolean recoverComplete = true;
+    for (String dataDir : dataDirs) {
+      if (!new File(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER).exists()) {
+        continue;
+      }
+      for (File syncFolder : new File(
+          FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER)
+          .listFiles()) {
+        if (syncFolder.getName().equals(senderName)) {
+          recoverComplete &= recover(syncFolder);
+        }
+      }
+    }
+    return recoverComplete;
+  }
+
+  @Override
+  public void scanLogger(FileLoader loader, File syncLog, File loadLog) {
+    LoadType loadType = LoadType.NONE;
+    try (BufferedReader syncReader = new BufferedReader(new FileReader(syncLog))) {
+      String line;
+      try (BufferedReader loadReader = new BufferedReader(new FileReader(loadLog))) {
+        while ((line = loadReader.readLine()) != null) {
+          if (line.equals(LoadLogger.LOAD_DELETED_FILE_NAME_START)) {
+            loadType = LoadType.DELETE;
+          } else if (line.equals(LoadLogger.LOAD_TSFILE_START)) {
+            loadType = LoadType.ADD;
+          } else {
+            while (!syncReader.readLine().equals(line)) {
+            }
+          }
+        }
+      }
+      loader.setCurType(loadType);
+      while ((line = syncReader.readLine()) != null) {
+        if (line.equals(SyncReceiverLogger.SYNC_DELETED_FILE_NAME_START)) {
+          loadType = LoadType.DELETE;
+        } else if (line.equals(SyncReceiverLogger.SYNC_TSFILE_START)) {
+          loadType = LoadType.ADD;
+        } else {
+          switch (loadType) {
+            case ADD:
+              loader.addTsfile(new File(line));
+              break;
+            case DELETE:
+              loader.addDeletedFileName(new File(line));
+              break;
+            default:
+              LOGGER.error("Wrong load type {}", loadType);
+          }
+        }
+      }
+      loader.endSync();
+    } catch (IOException e) {
+      LOGGER.error("Can not scan log for recovery", e);
+    }
+  }
+
+  private static class SyncReceiverLogAnalyzerHolder {
+
+    private static final SyncReceiverLogAnalyzer INSTANCE = new SyncReceiverLogAnalyzer();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
index b43cfa3..b940758 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
@@ -63,6 +63,9 @@ public class SyncReceiverLogger implements ISyncReceiverLogger {
 
   @Override
   public void close() throws IOException {
-    bw.close();
+    if(bw != null) {
+      bw.close();
+      bw = null;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 525244d..8d2d61c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader;
 import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
+import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogAnalyzer;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -62,8 +63,6 @@ public class SyncServiceImpl implements SyncService.Iface {
 
   private static final Logger logger = LoggerFactory.getLogger(SyncServiceImpl.class);
 
-  private static final MManager METADATA_MANGER = MManager.getInstance();
-
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   private ThreadLocal<String> syncFolderPath = new ThreadLocal<>();
@@ -104,7 +103,8 @@ public class SyncServiceImpl implements SyncService.Iface {
       if (currentFileWriter.get() != null && currentFileWriter.get().isOpen()) {
         currentFileWriter.get().close();
       }
-      return true;
+      syncLog.get().close();
+      return SyncReceiverLogAnalyzer.getInstance().recover(senderName.get());
     } catch (IOException e) {
       logger.error("Check recovery state fail", e);
       return false;
@@ -269,32 +269,32 @@ public class SyncServiceImpl implements SyncService.Iface {
           kv = args[k].split("=");
           props.put(kv[0], kv[1]);
         }
-        METADATA_MANGER
+        MManager.getInstance()
             .addPathToMTree(new Path(args[1]), TSDataType.deserialize(Short.valueOf(args[2])),
                 TSEncoding.deserialize(Short.valueOf(args[3])),
                 CompressionType.deserialize(Short.valueOf(args[4])),
                 props);
         break;
       case MetadataOperationType.DELETE_PATH_FROM_MTREE:
-        METADATA_MANGER.deletePaths(Collections.singletonList(new Path(args[1])));
+        MManager.getInstance().deletePaths(Collections.singletonList(new Path(args[1])));
         break;
       case MetadataOperationType.SET_STORAGE_LEVEL_TO_MTREE:
-        METADATA_MANGER.setStorageLevelToMTree(args[1]);
+        MManager.getInstance().setStorageLevelToMTree(args[1]);
         break;
       case MetadataOperationType.ADD_A_PTREE:
-        METADATA_MANGER.addAPTree(args[1]);
+        MManager.getInstance().addAPTree(args[1]);
         break;
       case MetadataOperationType.ADD_A_PATH_TO_PTREE:
-        METADATA_MANGER.addPathToPTree(args[1]);
+        MManager.getInstance().addPathToPTree(args[1]);
         break;
       case MetadataOperationType.DELETE_PATH_FROM_PTREE:
-        METADATA_MANGER.deletePathFromPTree(args[1]);
+        MManager.getInstance().deletePathFromPTree(args[1]);
         break;
       case MetadataOperationType.LINK_MNODE_TO_PTREE:
-        METADATA_MANGER.linkMNodeToPTree(args[1], args[2]);
+        MManager.getInstance().linkMNodeToPTree(args[1], args[2]);
         break;
       case MetadataOperationType.UNLINK_MNODE_FROM_PTREE:
-        METADATA_MANGER.unlinkMNodeFromPTree(args[1], args[2]);
+        MManager.getInstance().unlinkMNodeFromPTree(args[1], args[2]);
         break;
       default:
         logger.error("Unrecognizable command {}", cmd);
@@ -305,7 +305,6 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus endSync() throws TException {
     try {
       syncLog.get().close();
-      new File(getSyncDataPath(), Constans.SYNC_END).createNewFile();
       FileLoaderManager.getInstance().getFileLoader(senderName.get()).endSync();
     } catch (IOException e) {
       logger.error("Can not end sync", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index d3f8ff1..d711aa5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -71,8 +71,6 @@ public class Constans {
 
   public static final String RECEIVER_DATA_FOLDER_NAME = "data";
 
-  public static final String SYNC_END = "sync.end";
-
   public static final String LOAD_LOG_NAME = "load.log";
 
 }