You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/23 11:28:28 UTC

[iotdb] branch master updated: [IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f03f0205ab [IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)
f03f0205ab is described below

commit f03f0205ab1dc7325a2f0864aacf68ea7505f36b
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Sat Apr 23 19:28:23 2022 +0800

    [IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)
---
 .../db/integration/sync/IoTDBSyncReceiverIT.java   |   2 +-
 .../apache/iotdb/db/qp/executor/PlanExecutor.java  |   2 +-
 .../apache/iotdb/db/sync/conf/SyncPathUtil.java    |  11 ++
 .../iotdb/db/sync/receiver/ReceiverService.java    |  70 +++++++----
 .../db/sync/receiver/manager/ReceiverManager.java  | 132 +++++++++++----------
 .../db/sync/receiver/recovery/ReceiverLog.java     |  20 ++--
 .../receiver/recovery/ReceiverLogAnalyzer.java     |  34 +++---
 .../db/sync/sender/service/SenderService.java      |   2 -
 .../db/sync/transport/client/TransportClient.java  |  29 +++--
 .../transport/server/TransportServiceImpl.java     |  61 +++++-----
 .../sync/receiver/manager/ReceiverManagerTest.java |  12 +-
 .../receiver/recovery/ReceiverLogAnalyzerTest.java |  17 ++-
 thrift-sync/src/main/thrift/transport.thrift       |   6 +-
 13 files changed, 214 insertions(+), 184 deletions(-)

diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 3c82dfba09..49d920d993 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -100,7 +100,7 @@ public class IoTDBSyncReceiverIT {
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.envSetUp();
     try {
-      ReceiverService.getInstance().startPipeServer();
+      ReceiverService.getInstance().startPipeServer(true);
       new Socket("localhost", 6670).close();
     } catch (Exception e) {
       Assert.fail("Failed to start pipe server because " + e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 22aa5d1996..9e6c198e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -457,7 +457,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private boolean operateStartPipeServer() throws QueryProcessException {
     try {
-      ReceiverService.getInstance().startPipeServer();
+      ReceiverService.getInstance().startPipeServer(false);
     } catch (PipeServerException e) {
       throw new QueryProcessException(e);
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
index 0889354c7d..fc38852597 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.conf;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
 
 import java.io.File;
 import java.io.IOException;
@@ -50,6 +51,16 @@ public class SyncPathUtil {
   }
 
   /** receiver */
+  public static String getFileDataDirPath(IdentityInfo identityInfo) {
+    return SyncPathUtil.getReceiverFileDataDir(
+        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+  }
+
+  public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+    return SyncPathUtil.getReceiverPipeLogDir(
+        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+  }
+
   public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
     return getReceiverPipeDir(pipeName, remoteIp, createTime)
         + File.separator
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
index 04558c4a13..61dce3db05 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
@@ -60,10 +60,17 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STAT
 public class ReceiverService implements IService {
   private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
   private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
-  private Collector collector;
+  private final Collector collector;
 
-  /** start receiver service */
-  public void startPipeServer() throws PipeServerException {
+  /**
+   * start receiver service
+   *
+   * @param isRecovery if isRecovery, it will ignore check and force a start
+   */
+  public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
+    if (receiverManager.isPipeServerEnable() && !isRecovery) {
+      return;
+    }
     try {
       TransportServerManager.getInstance().startService();
       receiverManager.startServer();
@@ -82,7 +89,10 @@ public class ReceiverService implements IService {
   }
 
   /** stop receiver service */
-  public void stopPipeServer() throws PipeServerException {
+  public synchronized void stopPipeServer() throws PipeServerException {
+    if (!receiverManager.isPipeServerEnable()) {
+      return;
+    }
     try {
       List<PipeInfo> pipeInfos = receiverManager.getAllPipeInfos();
       for (PipeInfo pipeInfo : pipeInfos) {
@@ -100,9 +110,8 @@ public class ReceiverService implements IService {
   }
 
   /** heartbeat RPC handle */
-  public SyncResponse receiveMsg(SyncRequest request) {
+  public synchronized SyncResponse receiveMsg(SyncRequest request) {
     SyncResponse response = new SyncResponse(ResponseType.INFO, "");
-    ;
     try {
       switch (request.getType()) {
         case HEARTBEAT:
@@ -143,34 +152,47 @@ public class ReceiverService implements IService {
 
   /** create and start a new pipe named pipeName */
   private void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
-    logger.info("create Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
-    createDir(pipeName, remoteIp, createTime);
-    receiverManager.createPipe(pipeName, remoteIp, createTime);
+    PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+    if (pipeInfo == null || pipeInfo.getStatus().equals(PipeStatus.DROP)) {
+      logger.info(
+          "create Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+      createDir(pipeName, remoteIp, createTime);
+      receiverManager.createPipe(pipeName, remoteIp, createTime);
+    }
   }
 
   /** start an existed pipe named pipeName */
   private void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
-    logger.info("start Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
-    receiverManager.startPipe(pipeName, remoteIp);
-    collector.startPipe(pipeName, remoteIp, createTime);
+    PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+    if (pipeInfo != null && pipeInfo.getStatus().equals(PipeStatus.STOP)) {
+      logger.info("start Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+      receiverManager.startPipe(pipeName, remoteIp, createTime);
+      collector.startPipe(pipeName, remoteIp, createTime);
+    }
   }
 
   /** stop an existed pipe named pipeName */
   private void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
-    logger.info("stop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
-    receiverManager.stopPipe(pipeName, remoteIp);
-    collector.stopPipe(pipeName, remoteIp, createTime);
+    PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+    if (pipeInfo != null && pipeInfo.getStatus().equals(PipeStatus.RUNNING)) {
+      logger.info("stop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+      receiverManager.stopPipe(pipeName, remoteIp, createTime);
+      collector.stopPipe(pipeName, remoteIp, createTime);
+    }
   }
 
   /** drop an existed pipe named pipeName */
   private void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
-    logger.info("drop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
-    receiverManager.dropPipe(pipeName, remoteIp);
-    collector.stopPipe(pipeName, remoteIp, createTime);
-    PipeDataQueueFactory.removeBufferedPipeDataQueue(
-        SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
-    File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
-    FileUtils.deleteDirectory(dir);
+    PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+    if (pipeInfo != null && !pipeInfo.getStatus().equals(PipeStatus.DROP)) {
+      logger.info("drop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+      receiverManager.dropPipe(pipeName, remoteIp, createTime);
+      collector.stopPipe(pipeName, remoteIp, createTime);
+      PipeDataQueueFactory.removeBufferedPipeDataQueue(
+          SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+      File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
+      FileUtils.deleteDirectory(dir);
+    }
   }
 
   private void createDir(String pipeName, String remoteIp, long createTime) {
@@ -206,7 +228,7 @@ public class ReceiverService implements IService {
   public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
     List<PipeInfo> pipeInfos;
     if (!StringUtils.isEmpty(plan.getPipeName())) {
-      pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
+      pipeInfos = receiverManager.getPipeInfosByPipeName(plan.getPipeName());
     } else {
       pipeInfos = receiverManager.getAllPipeInfos();
     }
@@ -248,7 +270,7 @@ public class ReceiverService implements IService {
     receiverManager.init();
     if (receiverManager.isPipeServerEnable()) {
       try {
-        startPipeServer();
+        startPipeServer(true);
       } catch (PipeServerException e) {
         throw new StartupException(e.getMessage());
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 531d648659..7abbfed8a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -39,8 +39,8 @@ public class ReceiverManager {
   private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
 
   private boolean pipeServerEnable;
-  // <pipeName, <remoteIp, pipeInfo>>
-  private Map<String, Map<String, PipeInfo>> pipeInfoMap;
+  // <pipeName, <remoteIp, <createTime, status>>>
+  private Map<String, Map<String, Map<Long, PipeStatus>>> pipeInfos;
   // <pipeFolderName, pipeMsg>
   private Map<String, List<PipeMessage>> pipeMessageMap;
   private ReceiverLog log;
@@ -49,7 +49,7 @@ public class ReceiverManager {
     log = new ReceiverLog();
     ReceiverLogAnalyzer analyzer = new ReceiverLogAnalyzer();
     analyzer.scan();
-    pipeInfoMap = analyzer.getPipeInfoMap();
+    pipeInfos = analyzer.getPipeInfos();
     pipeServerEnable = analyzer.isPipeServerEnable();
     pipeMessageMap = analyzer.getPipeMessageMap();
   }
@@ -70,43 +70,59 @@ public class ReceiverManager {
 
   public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
     log.createPipe(pipeName, remoteIp, createTime);
-    if (!pipeInfoMap.containsKey(pipeName)) {
-      pipeInfoMap.put(pipeName, new HashMap<>());
-    }
-    pipeInfoMap
-        .get(pipeName)
-        .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.STOP, createTime));
+    pipeInfos.putIfAbsent(pipeName, new HashMap<>());
+    pipeInfos.get(pipeName).putIfAbsent(remoteIp, new HashMap<>());
+    pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
   }
 
-  public void startPipe(String pipeName, String remoteIp) throws IOException {
-    log.startPipe(pipeName, remoteIp);
-    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
+  public void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+    log.startPipe(pipeName, remoteIp, createTime);
+    pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.RUNNING);
   }
 
-  public void stopPipe(String pipeName, String remoteIp) throws IOException {
-    log.stopPipe(pipeName, remoteIp);
-    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.STOP);
+  public void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+    log.stopPipe(pipeName, remoteIp, createTime);
+    pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
   }
 
-  public void dropPipe(String pipeName, String remoteIp) throws IOException {
-    log.dropPipe(pipeName, remoteIp);
-    pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
+  public void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+    log.dropPipe(pipeName, remoteIp, createTime);
+    pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.DROP);
   }
 
-  public List<PipeInfo> getPipeInfos(String pipeName) {
-    List<PipeInfo> res;
-    if (pipeInfoMap.containsKey(pipeName)) {
-      res = new ArrayList<>(pipeInfoMap.get(pipeName).values());
+  public List<PipeInfo> getPipeInfosByPipeName(String pipeName) {
+    if (!pipeInfos.containsKey(pipeName)) {
+      return Collections.emptyList();
     } else {
-      res = Collections.emptyList();
+      List<PipeInfo> res = new ArrayList<>();
+      for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
+          pipeInfos.get(pipeName).entrySet()) {
+        for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
+          res.add(
+              new PipeInfo(
+                  pipeName,
+                  remoteIpEntry.getKey(),
+                  createTimeEntry.getValue(),
+                  createTimeEntry.getKey()));
+        }
+      }
+      return res;
+    }
+  }
+
+  public PipeInfo getPipeInfo(String pipeName, String remoteIp, long createTime) {
+    if (pipeInfos.containsKey(pipeName) && pipeInfos.get(pipeName).containsKey(remoteIp)) {
+      return new PipeInfo(
+          pipeName, remoteIp, pipeInfos.get(pipeName).get(remoteIp).get(createTime), createTime);
+    } else {
+      return null;
     }
-    return res;
   }
 
   public List<PipeInfo> getAllPipeInfos() {
     List<PipeInfo> res = new ArrayList<>();
-    for (String pipeName : pipeInfoMap.keySet()) {
-      res.addAll(pipeInfoMap.get(pipeName).values());
+    for (String pipeName : pipeInfos.keySet()) {
+      res.addAll(getPipeInfosByPipeName(pipeName));
     }
     return res;
   }
@@ -119,24 +135,19 @@ public class ReceiverManager {
    * @param createTime createTime of pipe
    * @param message pipe message
    */
-  public void writePipeMessage(
+  public synchronized void writePipeMessage(
       String pipeName, String remoteIp, long createTime, PipeMessage message) {
-    if (pipeInfoMap.containsKey(pipeName) && pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
-      synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
-        String pipeIdentifier =
-            SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
-        try {
-          log.writePipeMsg(pipeIdentifier, message);
-        } catch (IOException e) {
-          logger.error(
-              "Can not write pipe message {} from {} to disk because {}",
-              message,
-              pipeIdentifier,
-              e.getMessage());
-        }
-        pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList<>()).add(message);
-      }
+    String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    try {
+      log.writePipeMsg(pipeIdentifier, message);
+    } catch (IOException e) {
+      logger.error(
+          "Can not write pipe message {} from {} to disk because {}",
+          message,
+          pipeIdentifier,
+          e.getMessage());
     }
+    pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList<>()).add(message);
   }
 
   /**
@@ -149,29 +160,24 @@ public class ReceiverManager {
    *     messages can be read next time.
    * @return recent messages
    */
-  public List<PipeMessage> getPipeMessages(
+  public synchronized List<PipeMessage> getPipeMessages(
       String pipeName, String remoteIp, long createTime, boolean consume) {
     List<PipeMessage> pipeMessageList = new ArrayList<>();
-    if (pipeInfoMap.containsKey(pipeName) && pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
-      synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
-        String pipeIdentifier =
-            SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
-        if (consume) {
-          try {
-            log.comsumePipeMsg(pipeIdentifier);
-          } catch (IOException e) {
-            logger.error(
-                "Can not read pipe message about {} from disk because {}",
-                pipeIdentifier,
-                e.getMessage());
-          }
-        }
-        if (pipeMessageMap.containsKey(pipeIdentifier)) {
-          pipeMessageList = pipeMessageMap.get(pipeIdentifier);
-          if (consume) {
-            pipeMessageMap.remove(pipeIdentifier);
-          }
-        }
+    String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+    if (consume) {
+      try {
+        log.comsumePipeMsg(pipeIdentifier);
+      } catch (IOException e) {
+        logger.error(
+            "Can not read pipe message about {} from disk because {}",
+            pipeIdentifier,
+            e.getMessage());
+      }
+    }
+    if (pipeMessageMap.containsKey(pipeIdentifier)) {
+      pipeMessageList = pipeMessageMap.get(pipeIdentifier);
+      if (consume) {
+        pipeMessageMap.remove(pipeIdentifier);
       }
     }
     return pipeMessageList;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
index 12c7d0281c..9a1840c4b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
@@ -61,19 +61,19 @@ public class ReceiverLog {
   }
 
   public void createPipe(String pipeName, String remoteIp, long time) throws IOException {
-    writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
+    writeLog(pipeName, remoteIp, time);
   }
 
-  public void startPipe(String pipeName, String remoteIp) throws IOException {
-    writeLog(pipeName, remoteIp, PipeStatus.RUNNING);
+  public void startPipe(String pipeName, String remoteIp, long time) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
   }
 
-  public void stopPipe(String pipeName, String remoteIp) throws IOException {
-    writeLog(pipeName, remoteIp, PipeStatus.STOP);
+  public void stopPipe(String pipeName, String remoteIp, long time) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.STOP, time);
   }
 
-  public void dropPipe(String pipeName, String remoteIp) throws IOException {
-    writeLog(pipeName, remoteIp, PipeStatus.DROP);
+  public void dropPipe(String pipeName, String remoteIp, long time) throws IOException {
+    writeLog(pipeName, remoteIp, PipeStatus.DROP, time);
   }
 
   public void writePipeMsg(String pipeIdentifier, PipeMessage pipeMessage) throws IOException {
@@ -100,16 +100,16 @@ public class ReceiverLog {
     if (pipeServerWriter == null) {
       init();
     }
-    pipeServerWriter.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
+    pipeServerWriter.write(String.format("%s,%s,%d,%s", pipeName, remoteIp, time, status));
     pipeServerWriter.newLine();
     pipeServerWriter.flush();
   }
 
-  private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+  private void writeLog(String pipeName, String remoteIp, long time) throws IOException {
     if (pipeServerWriter == null) {
       init();
     }
-    pipeServerWriter.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
+    pipeServerWriter.write(String.format("%s,%s,%d", pipeName, remoteIp, time));
     pipeServerWriter.newLine();
     pipeServerWriter.flush();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
index 02dc2229fe..3442c1154c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.db.sync.conf.SyncConstant;
 import org.apache.iotdb.db.sync.conf.SyncPathUtil;
-import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
 import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
 
@@ -43,12 +42,14 @@ public class ReceiverLogAnalyzer {
   private static final Logger logger = LoggerFactory.getLogger(ReceiverLogAnalyzer.class);
   // record recovery result of receiver server status
   private boolean pipeServerEnable = false;
-  private Map<String, Map<String, PipeInfo>> pipeInfoMap = new ConcurrentHashMap<>();
+  // <pipeName, <remoteIp, <createTime, status>>>
+  private Map<String, Map<String, Map<Long, PipeStatus>>> pipeInfos = new ConcurrentHashMap<>();
+  // <pipeFolderName, pipeMsg>
   private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<>();
 
   public void scan() throws StartupException {
     logger.info("Start to recover all sync state for sync receiver.");
-    pipeInfoMap = new ConcurrentHashMap<>();
+    pipeInfos = new ConcurrentHashMap<>();
     pipeMessageMap = new ConcurrentHashMap<>();
     pipeServerEnable = false;
     File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
@@ -96,8 +97,8 @@ public class ReceiverLogAnalyzer {
     return pipeServerEnable;
   }
 
-  public Map<String, Map<String, PipeInfo>> getPipeInfoMap() {
-    return pipeInfoMap;
+  public Map<String, Map<String, Map<Long, PipeStatus>>> getPipeInfos() {
+    return pipeInfos;
   }
 
   public Map<String, List<PipeMessage>> getPipeMessageMap() {
@@ -118,21 +119,16 @@ public class ReceiverLogAnalyzer {
       String[] items = logLine.split(",");
       String pipeName = items[0];
       String remoteIp = items[1];
-      PipeStatus status = PipeStatus.valueOf(items[2]);
-      if (status == PipeStatus.RUNNING) {
-        if (!pipeInfoMap.containsKey(pipeName)) {
-          pipeInfoMap.put(pipeName, new HashMap<>());
-        }
-        if (items.length == 4) {
-          // create
-          pipeInfoMap
-              .get(pipeName)
-              .put(remoteIp, new PipeInfo(pipeName, remoteIp, status, Long.parseLong(items[3])));
-        } else {
-          pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
-        }
+      long createTime = Long.parseLong(items[2]);
+      if (items.length == 4) {
+        // start、stop、drop
+        PipeStatus status = PipeStatus.valueOf(items[3]);
+        pipeInfos.get(pipeName).get(remoteIp).put(createTime, status);
       } else {
-        pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+        // create
+        pipeInfos.putIfAbsent(pipeName, new HashMap<>());
+        pipeInfos.get(pipeName).putIfAbsent(remoteIp, new HashMap<>());
+        pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
       }
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index d21fb2973f..b4861053ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -215,7 +215,6 @@ public class SenderService implements IService {
   public synchronized void stopPipe(String pipeName) throws PipeException {
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
-      sendMsg(RequestType.STOP);
       runningPipe.stop();
       transportHandler.stop();
     }
@@ -225,7 +224,6 @@ public class SenderService implements IService {
   public synchronized void startPipe(String pipeName) throws PipeException {
     checkRunningPipeExistAndName(pipeName);
     if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
-      sendMsg(RequestType.START);
       runningPipe.start();
       transportHandler.start();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index 3869b5a4fc..cc5375d475 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -30,14 +30,7 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.ResponseType;
-import org.apache.iotdb.service.transport.thrift.SyncRequest;
-import org.apache.iotdb.service.transport.thrift.SyncResponse;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.service.transport.thrift.*;
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -299,7 +292,7 @@ public class TransportClient implements ITransportClient {
             try {
               status =
                   serviceClient.transportData(
-                      identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+                      metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
             } catch (TException e) {
               // retry
               logger.error("TException happened! ", e);
@@ -377,9 +370,7 @@ public class TransportClient implements ITransportClient {
                 file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
       }
       try {
-        status =
-            serviceClient.checkFileDigest(
-                identityInfo, metaInfo, ByteBuffer.wrap(messageDigest.digest()));
+        status = serviceClient.checkFileDigest(metaInfo, ByteBuffer.wrap(messageDigest.digest()));
       } catch (TException e) {
         // retry
         logger.error("TException happens! ", e);
@@ -420,7 +411,7 @@ public class TransportClient implements ITransportClient {
             new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
         TransportStatus status =
             serviceClient.transportData(
-                identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+                metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
 
         if (status.code == SUCCESS_CODE) {
           break;
@@ -451,6 +442,14 @@ public class TransportClient implements ITransportClient {
         throw new SyncConnectionException(
             String.format("Handshake with receiver %s:%d error.", ipAddress, port));
       }
+      SenderService.getInstance()
+          .receiveMsg(
+              heartbeat(
+                  new SyncRequest(
+                      RequestType.START,
+                      pipe.getName(),
+                      InetAddress.getLocalHost().getHostAddress(),
+                      pipe.getCreateTime())));
       while (!Thread.currentThread().isInterrupted()) {
         PipeData pipeData = pipe.take();
         if (!senderTransport(pipeData)) {
@@ -468,7 +467,7 @@ public class TransportClient implements ITransportClient {
       }
     } catch (InterruptedException e) {
       logger.info("Interrupted by pipe, exit transport.");
-    } catch (SyncConnectionException e) {
+    } catch (SyncConnectionException | UnknownHostException e) {
       logger.error(
           String.format("Connect to receiver %s:%d error, because %s.", ipAddress, port, e));
       SenderService.getInstance()
@@ -509,7 +508,7 @@ public class TransportClient implements ITransportClient {
           heartbeatTransport.open();
         }
 
-        return heartbeatClient.heartbeat(identityInfo, syncRequest);
+        return heartbeatClient.heartbeat(syncRequest);
       } catch (TException e) {
         logger.info(
             String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index f9e57bca69..c53702dace 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -28,13 +28,7 @@ import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
 import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.SyncRequest;
-import org.apache.iotdb.service.transport.thrift.SyncResponse;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.service.transport.thrift.*;
 
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -70,6 +64,11 @@ public class TransportServiceImpl implements TransportService.Iface {
   private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final String RECORD_SUFFIX = ".record";
   private static final String PATCH_SUFFIX = ".patch";
+  private ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+
+  public TransportServiceImpl() {
+    identityInfoThreadLocal = new ThreadLocal<>();
+  }
 
   private class CheckResult {
     boolean result;
@@ -135,6 +134,7 @@ public class TransportServiceImpl implements TransportService.Iface {
   @Override
   public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
     logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
+    identityInfoThreadLocal.set(identityInfo);
     // check ip address
     if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
       return new TransportStatus(
@@ -150,8 +150,8 @@ public class TransportServiceImpl implements TransportService.Iface {
               identityInfo.version, config.getIoTDBVersion()));
     }
 
-    if (!new File(getFileDataDirPath(identityInfo)).exists()) {
-      new File(getFileDataDirPath(identityInfo)).mkdirs();
+    if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
+      new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
     }
     return new TransportStatus(SUCCESS_CODE, "");
   }
@@ -197,11 +197,11 @@ public class TransportServiceImpl implements TransportService.Iface {
   }
 
   @Override
-  public TransportStatus transportData(
-      IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+  public TransportStatus transportData(MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+    IdentityInfo identityInfo = identityInfoThreadLocal.get();
     logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
 
-    String fileDir = getFileDataDirPath(identityInfo);
+    String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
     Type type = metaInfo.type;
     String fileName = metaInfo.fileName;
     long startIndex = metaInfo.startIndex;
@@ -246,7 +246,7 @@ public class TransportServiceImpl implements TransportService.Iface {
           // Do with file
           handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
         }
-        PipeDataQueueFactory.getBufferedPipeDataQueue(getPipeLogDirPath(identityInfo))
+        PipeDataQueueFactory.getBufferedPipeDataQueue(SyncPathUtil.getPipeLogDirPath(identityInfo))
             .offer(pipeData);
       } catch (IOException | IllegalPathException e) {
         logger.error("Pipe data transport error, {}", e.getMessage());
@@ -280,11 +280,10 @@ public class TransportServiceImpl implements TransportService.Iface {
   }
 
   @Override
-  public TransportStatus checkFileDigest(
-      IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer digest) throws TException {
+  public TransportStatus checkFileDigest(MetaInfo metaInfo, ByteBuffer digest) throws TException {
+    IdentityInfo identityInfo = identityInfoThreadLocal.get();
     logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
-
-    String fileDir = getFileDataDirPath(identityInfo);
+    String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
     synchronized (fileDir.intern()) {
       String fileName = metaInfo.fileName;
       MessageDigest messageDigest = null;
@@ -326,8 +325,7 @@ public class TransportServiceImpl implements TransportService.Iface {
   }
 
   @Override
-  public SyncResponse heartbeat(IdentityInfo identityInfo, SyncRequest syncRequest)
-      throws TException {
+  public SyncResponse heartbeat(SyncRequest syncRequest) throws TException {
     return ReceiverService.getInstance().receiveMsg(syncRequest);
   }
 
@@ -343,8 +341,19 @@ public class TransportServiceImpl implements TransportService.Iface {
    * release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
    */
   public void handleClientExit() {
-    // TODO: Handle client exit here.
-    // do nothing now
+    // Handle client exit here.
+    IdentityInfo identityInfo = identityInfoThreadLocal.get();
+    if (identityInfo != null) {
+      // stop pipe
+      identityInfoThreadLocal.remove();
+      ReceiverService.getInstance()
+          .receiveMsg(
+              new SyncRequest(
+                  RequestType.STOP,
+                  identityInfo.getPipeName(),
+                  identityInfo.getAddress(),
+                  identityInfo.getCreateTime()));
+    }
   }
 
   /**
@@ -379,14 +388,4 @@ public class TransportServiceImpl implements TransportService.Iface {
           String.format("Delete record file %s error, because %s.", recordFile.getPath(), e));
     }
   }
-
-  private String getFileDataDirPath(IdentityInfo identityInfo) {
-    return SyncPathUtil.getReceiverFileDataDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
-  }
-
-  private String getPipeLogDirPath(IdentityInfo identityInfo) {
-    return SyncPathUtil.getReceiverPipeLogDir(
-        identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
-  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
index 2bc07dce99..eb4d2859cb 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
@@ -56,14 +56,14 @@ public class ReceiverManagerTest {
       manager.createPipe(pipe1, ip1, 1);
       manager.createPipe(pipe2, ip2, 2);
       manager.createPipe(pipe1, ip2, 3);
-      manager.stopPipe(pipe1, ip1);
-      manager.stopPipe(pipe2, ip2);
-      manager.dropPipe(pipe1, ip2);
-      manager.startPipe(pipe1, ip1);
+      manager.stopPipe(pipe1, ip1, 1);
+      manager.stopPipe(pipe2, ip2, 2);
+      manager.dropPipe(pipe1, ip2, 3);
+      manager.startPipe(pipe1, ip1, 1);
       List<PipeInfo> allPipeInfos = manager.getAllPipeInfos();
       Assert.assertEquals(3, allPipeInfos.size());
-      List<PipeInfo> pipeInfos1 = manager.getPipeInfos(pipe1);
-      List<PipeInfo> pipeInfos2 = manager.getPipeInfos(pipe2);
+      List<PipeInfo> pipeInfos1 = manager.getPipeInfosByPipeName(pipe1);
+      List<PipeInfo> pipeInfos2 = manager.getPipeInfosByPipeName(pipe2);
       Assert.assertEquals(2, pipeInfos1.size());
       Assert.assertEquals(1, pipeInfos2.size());
       for (PipeInfo pipeInfo : pipeInfos2) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index 6ced92cdd4..b97e7686e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.recovery;
 
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.sync.conf.SyncPathUtil;
-import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
 import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -62,24 +61,24 @@ public class ReceiverLogAnalyzerTest {
       log.createPipe(pipe1, ip1, 1);
       log.createPipe(pipe2, ip2, 2);
       log.createPipe(pipe1, ip2, 3);
-      log.stopPipe(pipe1, ip1);
+      log.stopPipe(pipe1, ip1, 1);
       log.stopPipeServer();
       log.startPipeServer();
-      log.stopPipe(pipe2, ip2);
-      log.dropPipe(pipe1, ip2);
-      log.startPipe(pipe1, ip1);
+      log.stopPipe(pipe2, ip2, 2);
+      log.dropPipe(pipe1, ip2, 3);
+      log.startPipe(pipe1, ip1, 1);
       log.close();
       ReceiverLogAnalyzer receiverLogAnalyzer = new ReceiverLogAnalyzer();
       receiverLogAnalyzer.scan();
-      Map<String, Map<String, PipeInfo>> map = receiverLogAnalyzer.getPipeInfoMap();
+      Map<String, Map<String, Map<Long, PipeStatus>>> map = receiverLogAnalyzer.getPipeInfos();
       Assert.assertTrue(receiverLogAnalyzer.isPipeServerEnable());
       Assert.assertNotNull(map);
       Assert.assertEquals(2, map.get(pipe1).size());
       Assert.assertEquals(1, map.get(pipe2).size());
       Assert.assertEquals(1, map.get(pipe2).size());
-      Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.STOP, 2), map.get(pipe2).get(ip2));
-      Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), map.get(pipe1).get(ip1));
-      Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), map.get(pipe1).get(ip2));
+      Assert.assertEquals(PipeStatus.STOP, map.get(pipe2).get(ip2).get(2L));
+      Assert.assertEquals(PipeStatus.RUNNING, map.get(pipe1).get(ip1).get(1L));
+      Assert.assertEquals(PipeStatus.DROP, map.get(pipe1).get(ip2).get(3L));
     } catch (Exception e) {
       Assert.fail();
       e.printStackTrace();
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
index 74a4512ae3..d88c1e1400 100644
--- a/thrift-sync/src/main/thrift/transport.thrift
+++ b/thrift-sync/src/main/thrift/transport.thrift
@@ -84,7 +84,7 @@ struct SyncResponse{
 
 service TransportService{
   TransportStatus handshake(IdentityInfo info);
-  TransportStatus transportData(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary buff, 4:binary digest);
-  TransportStatus checkFileDigest(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary digest);
-  SyncResponse heartbeat(1:IdentityInfo identityInfo, 2:SyncRequest syncRequest)
+  TransportStatus transportData(1:MetaInfo metaInfo, 2:binary buff, 3:binary digest);
+  TransportStatus checkFileDigest(1:MetaInfo metaInfo, 2:binary digest);
+  SyncResponse heartbeat(SyncRequest syncRequest)
 }