You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/08/30 02:43:43 UTC

[incubator-iotdb] 02/03: complete load module in receiver end

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch reimpl_sync
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 35f06a88c21e2fa99d03d44c4e1633197fcad5b4
Author: lta <li...@163.com>
AuthorDate: Thu Aug 29 21:00:57 2019 +0800

    complete load module in receiver end
---
 .../iotdb/db/sync/receiver/SyncServerManager.java  |   3 +
 .../iotdb/db/sync/receiver/load/FileLoader.java    | 105 +++++++++++++++++++--
 .../db/sync/receiver/load/FileLoaderManager.java   |  93 ++++++++++++++++++
 .../iotdb/db/sync/receiver/load/IFileLoader.java   |   6 ++
 .../sync/receiver/recover/SyncReceiverLogger.java  |  18 ++++
 .../db/sync/receiver/transfer/SyncServiceImpl.java |  18 ++--
 .../apache/iotdb/db/sync/sender/conf/Constans.java |   2 +
 .../db/sync/sender/conf/SyncSenderConfig.java      |   7 ++
 .../sync/sender/transfer/DataTransferManager.java  |  38 +++++++-
 service-rpc/src/main/thrift/sync.thrift            |   2 +-
 10 files changed, 275 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
index 1d37539..faba821 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/SyncServerManager.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.StartupException;
 import org.apache.iotdb.db.service.IService;
 import org.apache.iotdb.db.service.ServiceType;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
 import org.apache.iotdb.db.sync.receiver.transfer.SyncServiceImpl;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncService.Processor;
@@ -65,6 +66,7 @@ public class SyncServerManager implements IService {
     if (!conf.isSyncEnable()) {
       return;
     }
+    FileLoaderManager.getInstance().start();
     if (conf.getIpWhiteList() == null) {
       logger.error(
           "Sync server failed to start because IP white list is null, please set IP white list.");
@@ -83,6 +85,7 @@ public class SyncServerManager implements IService {
   @Override
   public void stop() {
     if (conf.isSyncEnable()) {
+      FileLoaderManager.getInstance().stop();
       ((SyncServiceThread) syncServerThread).close();
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
index c09b0ec..7681433 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoader.java
@@ -1,33 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import java.util.ArrayDeque;
+import org.apache.iotdb.db.sync.sender.conf.Constans;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FileLoader implements IFileLoader {
 
-  private FileLoader(){
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileLoader.class);
 
+  private static final int WAIT_TIME = 1000;
+
+  private File syncEndFile;
+
+  private String senderName;
+
+  private ArrayDeque<LoadTask> queue = new ArrayDeque<>();
+
+  private volatile boolean endSync = false;
+
+  private FileLoader(String senderName, String syncFolderPath) {
+    this.senderName = senderName;
+    this.syncEndFile = new File(syncFolderPath, Constans.SYNC_END);
+    FileLoaderManager.getInstance().addFileLoader(senderName, this);
+    FileLoaderManager.getInstance().addLoadTaskRunner(loadTaskRunner);
   }
 
-  public static FileLoader getInstance(){
-    return FileLoaderHolder.INSTANCE;
+  public static FileLoader createFileLoader(String senderName, String syncFolderPath) {
+    return new FileLoader(senderName, syncFolderPath);
   }
 
+  private Runnable loadTaskRunner = () -> {
+    try {
+      while (true) {
+        if (endSync) {
+          cleanUp();
+          break;
+        }
+        if (queue.isEmpty()) {
+          synchronized (queue) {
+            if (queue.isEmpty()) {
+              queue.wait(WAIT_TIME);
+            }
+          }
+        }
+        if (!queue.isEmpty()) {
+          handleLoadTask(queue.poll());
+        }
+      }
+    } catch (InterruptedException e) {
+      LOGGER.info("Close the run load task thread.");
+    }
+  };
+
   @Override
   public void addDeletedFileName(String sgName, File deletedFile) {
-
+    synchronized (queue) {
+      queue.add(new LoadTask(sgName, deletedFile, LoadType.DELETE));
+      queue.notify();
+    }
   }
 
   @Override
   public void addTsfile(String sgName, File tsfile) {
+    synchronized (queue) {
+      queue.add(new LoadTask(sgName, tsfile, LoadType.ADD));
+      queue.notify();
+    }
+  }
+
+  @Override
+  public void endSync() {
+    this.endSync = true;
+  }
+
+  @Override
+  public void handleLoadTask(LoadTask task) {
 
   }
 
+  @Override
+  public void cleanUp() {
+    FileLoaderManager.getInstance().removeFileLoader(senderName);
+  }
+
+  class LoadTask {
 
-  private static class FileLoaderHolder{
-    private static final FileLoader INSTANCE = new FileLoader();
+    String sgName;
+    File file;
+    LoadType type;
 
-    private static FileLoader getInstance(){
-      return INSTANCE;
+    LoadTask(String sgName, File file, LoadType type) {
+      this.sgName = sgName;
+      this.file = file;
+      this.type = type;
     }
   }
+
+  private enum LoadType {
+    DELETE, ADD;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
new file mode 100644
index 0000000..2b86777
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/FileLoaderManager.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.sync.receiver.load;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FileLoaderManager {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FileLoaderManager.class);
+
+  private static final int WAIT_TIMEOUT = 2000;
+
+  private ConcurrentHashMap<String, FileLoader> fileLoaderMap;
+
+  private ExecutorService loadTaskRunnerPool;
+
+
+  private FileLoaderManager() {
+  }
+
+  public static FileLoaderManager getInstance() {
+    return FileLoaderManagerHolder.INSTANCE;
+  }
+
+  public void addFileLoader(String senderName, FileLoader fileLoader){
+    fileLoaderMap.put(senderName, fileLoader);
+  }
+
+  public void removeFileLoader(String senderName){
+    fileLoaderMap.remove(senderName);
+  }
+
+  public FileLoader getFileLoader(String senderName) {
+    return fileLoaderMap.get(senderName);
+  }
+
+  public void addLoadTaskRunner(Runnable taskRunner){
+    loadTaskRunnerPool.submit(taskRunner);
+  }
+
+  public void start() {
+    if (fileLoaderMap == null) {
+      fileLoaderMap = new ConcurrentHashMap<>();
+    }
+    if (loadTaskRunnerPool == null) {
+      loadTaskRunnerPool = Executors.newCachedThreadPool();
+    }
+  }
+
+  public void stop() {
+    fileLoaderMap = null;
+    loadTaskRunnerPool.shutdownNow();
+    int totalWaitTime = WAIT_TIMEOUT;
+    while (!loadTaskRunnerPool.isTerminated()) {
+      try {
+        if (!loadTaskRunnerPool.awaitTermination(WAIT_TIMEOUT, TimeUnit.MILLISECONDS)) {
+          LOGGER.info("File load manager thread pool doesn't exit after {}ms.",
+              +totalWaitTime);
+        }
+        totalWaitTime += WAIT_TIMEOUT;
+      } catch (InterruptedException e) {
+        LOGGER.error("Interrupted while waiting file load manager thread pool to exit. ", e);
+      }
+    }
+    loadTaskRunnerPool = null;
+  }
+
+  private static class FileLoaderManagerHolder {
+
+    private static final FileLoaderManager INSTANCE = new FileLoaderManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
index c2d55b0..98aeca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/load/IFileLoader.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.sync.receiver.load;
 
 import java.io.File;
+import org.apache.iotdb.db.sync.receiver.load.FileLoader.LoadTask;
 
 public interface IFileLoader {
 
@@ -26,4 +27,9 @@ public interface IFileLoader {
 
   void addTsfile(String sgName, File tsfile);
 
+  void endSync();
+
+  void handleLoadTask(LoadTask task);
+
+  void cleanUp();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
index 9a8cc96..b43cfa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recover/SyncReceiverLogger.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.iotdb.db.sync.receiver.recover;
 
 import java.io.BufferedWriter;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
index 7156ef7..e2f45a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/transfer/SyncServiceImpl.java
@@ -43,6 +43,7 @@ import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.metadata.MetadataConstant;
 import org.apache.iotdb.db.metadata.MetadataOperationType;
 import org.apache.iotdb.db.sync.receiver.load.FileLoader;
+import org.apache.iotdb.db.sync.receiver.load.FileLoaderManager;
 import org.apache.iotdb.db.sync.receiver.recover.SyncReceiverLogger;
 import org.apache.iotdb.db.sync.sender.conf.Constans;
 import org.apache.iotdb.db.utils.FilePathUtils;
@@ -71,7 +72,7 @@ public class SyncServiceImpl implements SyncService.Iface {
 
   private ThreadLocal<SyncReceiverLogger> syncLog = new ThreadLocal<>();
 
-  private ThreadLocal<String> senderIp = new ThreadLocal<>();
+  private ThreadLocal<String> senderName = new ThreadLocal<>();
 
   private ThreadLocal<File> currentFile = new ThreadLocal<>();
 
@@ -83,10 +84,10 @@ public class SyncServiceImpl implements SyncService.Iface {
    * Verify IP address of sender
    */
   @Override
-  public ResultStatus check(String ipAddress) {
+  public ResultStatus check(String ipAddress, String uuid) {
     Thread.currentThread().setName(ThreadName.SYNC_SERVER.getName());
     if (SyncUtils.verifyIPSegment(config.getIpWhiteList(), ipAddress)) {
-      senderIp.set(ipAddress);
+      senderName.set(ipAddress + "-" + uuid);
       if (checkRecovery()) {
         return getSuccessResult();
       } else {
@@ -105,7 +106,7 @@ public class SyncServiceImpl implements SyncService.Iface {
       }
       return true;
     } catch (IOException e) {
-      e.printStackTrace();
+      logger.error("Check recovery state fail", e);
       return false;
     }
   }
@@ -115,6 +116,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     try {
       initPath();
       currentSG.remove();
+      FileLoader.createFileLoader(senderName.get(), syncFolderPath.get());
       syncLog.set(new SyncReceiverLogger(new File(getSyncDataPath(), Constans.SYNC_LOG_NAME)));
       return getSuccessResult();
     } catch (DiskSpaceInsufficientException | IOException e) {
@@ -130,7 +132,7 @@ public class SyncServiceImpl implements SyncService.Iface {
     String dataDir = DirectoryManager.getInstance().getNextFolderForSequenceFile();
     syncFolderPath
         .set(FilePathUtils.regularizePath(dataDir) + Constans.SYNC_RECEIVER + File.separatorChar
-            + senderIp.get());
+            + senderName.get());
   }
 
   /**
@@ -153,7 +155,8 @@ public class SyncServiceImpl implements SyncService.Iface {
   public ResultStatus syncDeletedFileName(String fileName) throws TException {
     try {
       syncLog.get().finishSyncDeletedFileName(new File(fileName));
-      FileLoader.getInstance().addDeletedFileName(currentSG.get(), new File(fileName));
+      FileLoaderManager.getInstance().getFileLoader(senderName.get())
+          .addDeletedFileName(currentSG.get(), new File(fileName));
     } catch (IOException e) {
       logger.error("Can not sync deleted file", e);
       return getErrorResult(
@@ -217,7 +220,8 @@ public class SyncServiceImpl implements SyncService.Iface {
         } else {
           if (!currentFile.get().getName().endsWith(TsFileResource.RESOURCE_SUFFIX)) {
             syncLog.get().finishSyncTsfile(currentFile.get());
-            FileLoader.getInstance().addTsfile(currentSG.get(), currentFile.get());
+            FileLoaderManager.getInstance().getFileLoader(senderName.get())
+                .addTsfile(currentSG.get(), currentFile.get());
           }
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
index 576b416..0825774 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/Constans.java
@@ -40,6 +40,8 @@ public class Constans {
 
   public static final String LOCK_FILE_NAME = "sync_lock";
 
+  public static final String UUID_FILE_NAME = "uuid.txt";
+
   public static final String SCHEMA_POS_FILE_NAME = "sync_schema_pos";
 
   public static final String LAST_LOCAL_FILE_NAME = "last_local_files.txt";
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
index d6028ca..4a0ae8b 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/conf/SyncSenderConfig.java
@@ -32,6 +32,8 @@ public class SyncSenderConfig {
 
   private String lockFilePath;
 
+  private String uuidPath;
+
   private String lastFileInfo;
 
   private String snapshotPath;
@@ -43,6 +45,7 @@ public class SyncSenderConfig {
     senderFolderPath = dataDirectory + File.separatorChar + Constans.SYNC_SENDER + File.separatorChar +
         getSyncReceiverName();
     lockFilePath = senderFolderPath + File.separatorChar + Constans.LOCK_FILE_NAME;
+    uuidPath = senderFolderPath + File.separatorChar + Constans.UUID_FILE_NAME;
     lastFileInfo = senderFolderPath + File.separatorChar + Constans.LAST_LOCAL_FILE_NAME;
     snapshotPath = senderFolderPath + File.separatorChar + Constans.DATA_SNAPSHOT_NAME;
     if(!new File(snapshotPath).exists()){
@@ -106,6 +109,10 @@ public class SyncSenderConfig {
     this.snapshotPath = snapshotPath;
   }
 
+  public String getUuidPath() {
+    return uuidPath;
+  }
+
   public String getSyncReceiverName() {
     return serverIp + Constans.SYNC_DIR_NAME_SEPARATOR + serverPort;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
index ce67c27..ec75c0a 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/DataTransferManager.java
@@ -19,6 +19,7 @@ import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
@@ -35,6 +36,7 @@ import java.security.NoSuchAlgorithmException;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.io.FileUtils;
@@ -275,7 +277,8 @@ public class DataTransferManager implements IDataTransferManager {
   @Override
   public void confirmIdentity() throws SyncConnectionException {
     try {
-      ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress());
+      ResultStatus status = serviceClient.check(InetAddress.getLocalHost().getHostAddress(),
+          getOrCreateUUID(config.getUuidPath()));
       if (!status.success) {
         throw new SyncConnectionException(
             "The receiver rejected the synchronization task because " + status.errorMsg);
@@ -286,6 +289,39 @@ public class DataTransferManager implements IDataTransferManager {
     }
   }
 
+  /**
+   * UUID marks the identity of sender for receiver.
+   */
+  public String getOrCreateUUID(String uuidPath) throws IOException {
+    File file = new File(uuidPath);
+    String uuid;
+    if (!file.getParentFile().exists()) {
+      file.getParentFile().mkdirs();
+    }
+    if (!file.exists()) {
+      try (FileOutputStream out = new FileOutputStream(file)) {
+        file.createNewFile();
+        uuid = generateUUID();
+        out.write(uuid.getBytes());
+      } catch (IOException e) {
+        logger.error("Cannot insert UUID to file {}", file.getPath());
+        throw new IOException(e);
+      }
+    } else {
+      try (BufferedReader bf = new BufferedReader((new FileReader(uuidPath)))) {
+        uuid = bf.readLine();
+      } catch (IOException e) {
+        logger.error("Cannot read UUID from file{}", file.getPath());
+        throw new IOException(e);
+      }
+    }
+    return uuid;
+  }
+
+  private String generateUUID() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
+  }
+
   @Override
   public void syncSchema() throws SyncConnectionException, TException {
     int retryCount = 0;
diff --git a/service-rpc/src/main/thrift/sync.thrift b/service-rpc/src/main/thrift/sync.thrift
index 2064c4e..17bc241 100755
--- a/service-rpc/src/main/thrift/sync.thrift
+++ b/service-rpc/src/main/thrift/sync.thrift
@@ -29,7 +29,7 @@ struct ResultStatus{
 }
 
 service SyncService{
-	ResultStatus check(1:string address)
+	ResultStatus check(1:string address, 2:string uuid)
 	ResultStatus startSync();
 	ResultStatus init(1:string storageGroupName)
 	ResultStatus syncDeletedFileName(1:string fileName)