You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/23 11:28:28 UTC
[iotdb] branch master updated: [IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f03f0205ab [IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)
f03f0205ab is described below
commit f03f0205ab1dc7325a2f0864aacf68ea7505f36b
Author: Chen YZ <43...@users.noreply.github.com>
AuthorDate: Sat Apr 23 19:28:23 2022 +0800
[IOTDB-2933] detect sender exit and set pipestatus=STOP (#5557)
---
.../db/integration/sync/IoTDBSyncReceiverIT.java | 2 +-
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../apache/iotdb/db/sync/conf/SyncPathUtil.java | 11 ++
.../iotdb/db/sync/receiver/ReceiverService.java | 70 +++++++----
.../db/sync/receiver/manager/ReceiverManager.java | 132 +++++++++++----------
.../db/sync/receiver/recovery/ReceiverLog.java | 20 ++--
.../receiver/recovery/ReceiverLogAnalyzer.java | 34 +++---
.../db/sync/sender/service/SenderService.java | 2 -
.../db/sync/transport/client/TransportClient.java | 29 +++--
.../transport/server/TransportServiceImpl.java | 61 +++++-----
.../sync/receiver/manager/ReceiverManagerTest.java | 12 +-
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 17 ++-
thrift-sync/src/main/thrift/transport.thrift | 6 +-
13 files changed, 214 insertions(+), 184 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
index 3c82dfba09..49d920d993 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
@@ -100,7 +100,7 @@ public class IoTDBSyncReceiverIT {
EnvironmentUtils.cleanEnv();
EnvironmentUtils.envSetUp();
try {
- ReceiverService.getInstance().startPipeServer();
+ ReceiverService.getInstance().startPipeServer(true);
new Socket("localhost", 6670).close();
} catch (Exception e) {
Assert.fail("Failed to start pipe server because " + e.getMessage());
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 22aa5d1996..9e6c198e9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -457,7 +457,7 @@ public class PlanExecutor implements IPlanExecutor {
private boolean operateStartPipeServer() throws QueryProcessException {
try {
- ReceiverService.getInstance().startPipeServer();
+ ReceiverService.getInstance().startPipeServer(false);
} catch (PipeServerException e) {
throw new QueryProcessException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
index 0889354c7d..fc38852597 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/conf/SyncPathUtil.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.sync.conf;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
+import org.apache.iotdb.service.transport.thrift.IdentityInfo;
import java.io.File;
import java.io.IOException;
@@ -50,6 +51,16 @@ public class SyncPathUtil {
}
/** receiver */
+ public static String getFileDataDirPath(IdentityInfo identityInfo) {
+ return SyncPathUtil.getReceiverFileDataDir(
+ identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+ }
+
+ public static String getPipeLogDirPath(IdentityInfo identityInfo) {
+ return SyncPathUtil.getReceiverPipeLogDir(
+ identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
+ }
+
public static String getReceiverPipeLogDir(String pipeName, String remoteIp, long createTime) {
return getReceiverPipeDir(pipeName, remoteIp, createTime)
+ File.separator
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
index 04558c4a13..61dce3db05 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
@@ -60,10 +60,17 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STAT
public class ReceiverService implements IService {
private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
private static final ReceiverManager receiverManager = ReceiverManager.getInstance();
- private Collector collector;
+ private final Collector collector;
- /** start receiver service */
- public void startPipeServer() throws PipeServerException {
+ /**
+ * start receiver service
+ *
+ * @param isRecovery if isRecovery, it will ignore check and force a start
+ */
+ public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
+ if (receiverManager.isPipeServerEnable() && !isRecovery) {
+ return;
+ }
try {
TransportServerManager.getInstance().startService();
receiverManager.startServer();
@@ -82,7 +89,10 @@ public class ReceiverService implements IService {
}
/** stop receiver service */
- public void stopPipeServer() throws PipeServerException {
+ public synchronized void stopPipeServer() throws PipeServerException {
+ if (!receiverManager.isPipeServerEnable()) {
+ return;
+ }
try {
List<PipeInfo> pipeInfos = receiverManager.getAllPipeInfos();
for (PipeInfo pipeInfo : pipeInfos) {
@@ -100,9 +110,8 @@ public class ReceiverService implements IService {
}
/** heartbeat RPC handle */
- public SyncResponse receiveMsg(SyncRequest request) {
+ public synchronized SyncResponse receiveMsg(SyncRequest request) {
SyncResponse response = new SyncResponse(ResponseType.INFO, "");
- ;
try {
switch (request.getType()) {
case HEARTBEAT:
@@ -143,34 +152,47 @@ public class ReceiverService implements IService {
/** create and start a new pipe named pipeName */
private void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
- logger.info("create Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
- createDir(pipeName, remoteIp, createTime);
- receiverManager.createPipe(pipeName, remoteIp, createTime);
+ PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+ if (pipeInfo == null || pipeInfo.getStatus().equals(PipeStatus.DROP)) {
+ logger.info(
+ "create Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+ createDir(pipeName, remoteIp, createTime);
+ receiverManager.createPipe(pipeName, remoteIp, createTime);
+ }
}
/** start an existed pipe named pipeName */
private void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
- logger.info("start Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
- receiverManager.startPipe(pipeName, remoteIp);
- collector.startPipe(pipeName, remoteIp, createTime);
+ PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+ if (pipeInfo != null && pipeInfo.getStatus().equals(PipeStatus.STOP)) {
+ logger.info("start Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+ receiverManager.startPipe(pipeName, remoteIp, createTime);
+ collector.startPipe(pipeName, remoteIp, createTime);
+ }
}
/** stop an existed pipe named pipeName */
private void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
- logger.info("stop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
- receiverManager.stopPipe(pipeName, remoteIp);
- collector.stopPipe(pipeName, remoteIp, createTime);
+ PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+ if (pipeInfo != null && pipeInfo.getStatus().equals(PipeStatus.RUNNING)) {
+ logger.info("stop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+ receiverManager.stopPipe(pipeName, remoteIp, createTime);
+ collector.stopPipe(pipeName, remoteIp, createTime);
+ }
}
/** drop an existed pipe named pipeName */
private void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
- logger.info("drop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
- receiverManager.dropPipe(pipeName, remoteIp);
- collector.stopPipe(pipeName, remoteIp, createTime);
- PipeDataQueueFactory.removeBufferedPipeDataQueue(
- SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
- File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
- FileUtils.deleteDirectory(dir);
+ PipeInfo pipeInfo = receiverManager.getPipeInfo(pipeName, remoteIp, createTime);
+ if (pipeInfo != null && !pipeInfo.getStatus().equals(PipeStatus.DROP)) {
+ logger.info("drop Pipe name={}, remoteIp={}, createTime={}", pipeName, remoteIp, createTime);
+ receiverManager.dropPipe(pipeName, remoteIp, createTime);
+ collector.stopPipe(pipeName, remoteIp, createTime);
+ PipeDataQueueFactory.removeBufferedPipeDataQueue(
+ SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
+ File dir = new File(SyncPathUtil.getReceiverPipeDir(pipeName, remoteIp, createTime));
+ FileUtils.deleteDirectory(dir);
+ }
}
private void createDir(String pipeName, String remoteIp, long createTime) {
@@ -206,7 +228,7 @@ public class ReceiverService implements IService {
public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
List<PipeInfo> pipeInfos;
if (!StringUtils.isEmpty(plan.getPipeName())) {
- pipeInfos = receiverManager.getPipeInfos(plan.getPipeName());
+ pipeInfos = receiverManager.getPipeInfosByPipeName(plan.getPipeName());
} else {
pipeInfos = receiverManager.getAllPipeInfos();
}
@@ -248,7 +270,7 @@ public class ReceiverService implements IService {
receiverManager.init();
if (receiverManager.isPipeServerEnable()) {
try {
- startPipeServer();
+ startPipeServer(true);
} catch (PipeServerException e) {
throw new StartupException(e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 531d648659..7abbfed8a3 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -39,8 +39,8 @@ public class ReceiverManager {
private static final Logger logger = LoggerFactory.getLogger(ReceiverManager.class);
private boolean pipeServerEnable;
- // <pipeName, <remoteIp, pipeInfo>>
- private Map<String, Map<String, PipeInfo>> pipeInfoMap;
+ // <pipeName, <remoteIp, <createTime, status>>>
+ private Map<String, Map<String, Map<Long, PipeStatus>>> pipeInfos;
// <pipeFolderName, pipeMsg>
private Map<String, List<PipeMessage>> pipeMessageMap;
private ReceiverLog log;
@@ -49,7 +49,7 @@ public class ReceiverManager {
log = new ReceiverLog();
ReceiverLogAnalyzer analyzer = new ReceiverLogAnalyzer();
analyzer.scan();
- pipeInfoMap = analyzer.getPipeInfoMap();
+ pipeInfos = analyzer.getPipeInfos();
pipeServerEnable = analyzer.isPipeServerEnable();
pipeMessageMap = analyzer.getPipeMessageMap();
}
@@ -70,43 +70,59 @@ public class ReceiverManager {
public void createPipe(String pipeName, String remoteIp, long createTime) throws IOException {
log.createPipe(pipeName, remoteIp, createTime);
- if (!pipeInfoMap.containsKey(pipeName)) {
- pipeInfoMap.put(pipeName, new HashMap<>());
- }
- pipeInfoMap
- .get(pipeName)
- .put(remoteIp, new PipeInfo(pipeName, remoteIp, PipeStatus.STOP, createTime));
+ pipeInfos.putIfAbsent(pipeName, new HashMap<>());
+ pipeInfos.get(pipeName).putIfAbsent(remoteIp, new HashMap<>());
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
}
- public void startPipe(String pipeName, String remoteIp) throws IOException {
- log.startPipe(pipeName, remoteIp);
- pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.RUNNING);
+ public void startPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+ log.startPipe(pipeName, remoteIp, createTime);
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.RUNNING);
}
- public void stopPipe(String pipeName, String remoteIp) throws IOException {
- log.stopPipe(pipeName, remoteIp);
- pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.STOP);
+ public void stopPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+ log.stopPipe(pipeName, remoteIp, createTime);
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
}
- public void dropPipe(String pipeName, String remoteIp) throws IOException {
- log.dropPipe(pipeName, remoteIp);
- pipeInfoMap.get(pipeName).get(remoteIp).setStatus(PipeStatus.DROP);
+ public void dropPipe(String pipeName, String remoteIp, long createTime) throws IOException {
+ log.dropPipe(pipeName, remoteIp, createTime);
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.DROP);
}
- public List<PipeInfo> getPipeInfos(String pipeName) {
- List<PipeInfo> res;
- if (pipeInfoMap.containsKey(pipeName)) {
- res = new ArrayList<>(pipeInfoMap.get(pipeName).values());
+ public List<PipeInfo> getPipeInfosByPipeName(String pipeName) {
+ if (!pipeInfos.containsKey(pipeName)) {
+ return Collections.emptyList();
} else {
- res = Collections.emptyList();
+ List<PipeInfo> res = new ArrayList<>();
+ for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
+ pipeInfos.get(pipeName).entrySet()) {
+ for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
+ res.add(
+ new PipeInfo(
+ pipeName,
+ remoteIpEntry.getKey(),
+ createTimeEntry.getValue(),
+ createTimeEntry.getKey()));
+ }
+ }
+ return res;
+ }
+ }
+
+ public PipeInfo getPipeInfo(String pipeName, String remoteIp, long createTime) {
+ if (pipeInfos.containsKey(pipeName) && pipeInfos.get(pipeName).containsKey(remoteIp)) {
+ return new PipeInfo(
+ pipeName, remoteIp, pipeInfos.get(pipeName).get(remoteIp).get(createTime), createTime);
+ } else {
+ return null;
}
- return res;
}
public List<PipeInfo> getAllPipeInfos() {
List<PipeInfo> res = new ArrayList<>();
- for (String pipeName : pipeInfoMap.keySet()) {
- res.addAll(pipeInfoMap.get(pipeName).values());
+ for (String pipeName : pipeInfos.keySet()) {
+ res.addAll(getPipeInfosByPipeName(pipeName));
}
return res;
}
@@ -119,24 +135,19 @@ public class ReceiverManager {
* @param createTime createTime of pipe
* @param message pipe message
*/
- public void writePipeMessage(
+ public synchronized void writePipeMessage(
String pipeName, String remoteIp, long createTime, PipeMessage message) {
- if (pipeInfoMap.containsKey(pipeName) && pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
- synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
- String pipeIdentifier =
- SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
- try {
- log.writePipeMsg(pipeIdentifier, message);
- } catch (IOException e) {
- logger.error(
- "Can not write pipe message {} from {} to disk because {}",
- message,
- pipeIdentifier,
- e.getMessage());
- }
- pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList<>()).add(message);
- }
+ String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ try {
+ log.writePipeMsg(pipeIdentifier, message);
+ } catch (IOException e) {
+ logger.error(
+ "Can not write pipe message {} from {} to disk because {}",
+ message,
+ pipeIdentifier,
+ e.getMessage());
}
+ pipeMessageMap.computeIfAbsent(pipeIdentifier, i -> new ArrayList<>()).add(message);
}
/**
@@ -149,29 +160,24 @@ public class ReceiverManager {
* messages can be read next time.
* @return recent messages
*/
- public List<PipeMessage> getPipeMessages(
+ public synchronized List<PipeMessage> getPipeMessages(
String pipeName, String remoteIp, long createTime, boolean consume) {
List<PipeMessage> pipeMessageList = new ArrayList<>();
- if (pipeInfoMap.containsKey(pipeName) && pipeInfoMap.get(pipeName).containsKey(remoteIp)) {
- synchronized (pipeInfoMap.get(pipeName).get(remoteIp)) {
- String pipeIdentifier =
- SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
- if (consume) {
- try {
- log.comsumePipeMsg(pipeIdentifier);
- } catch (IOException e) {
- logger.error(
- "Can not read pipe message about {} from disk because {}",
- pipeIdentifier,
- e.getMessage());
- }
- }
- if (pipeMessageMap.containsKey(pipeIdentifier)) {
- pipeMessageList = pipeMessageMap.get(pipeIdentifier);
- if (consume) {
- pipeMessageMap.remove(pipeIdentifier);
- }
- }
+ String pipeIdentifier = SyncPathUtil.getReceiverPipeFolderName(pipeName, remoteIp, createTime);
+ if (consume) {
+ try {
+ log.comsumePipeMsg(pipeIdentifier);
+ } catch (IOException e) {
+ logger.error(
+ "Can not read pipe message about {} from disk because {}",
+ pipeIdentifier,
+ e.getMessage());
+ }
+ }
+ if (pipeMessageMap.containsKey(pipeIdentifier)) {
+ pipeMessageList = pipeMessageMap.get(pipeIdentifier);
+ if (consume) {
+ pipeMessageMap.remove(pipeIdentifier);
}
}
return pipeMessageList;
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
index 12c7d0281c..9a1840c4b8 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLog.java
@@ -61,19 +61,19 @@ public class ReceiverLog {
}
public void createPipe(String pipeName, String remoteIp, long time) throws IOException {
- writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
+ writeLog(pipeName, remoteIp, time);
}
- public void startPipe(String pipeName, String remoteIp) throws IOException {
- writeLog(pipeName, remoteIp, PipeStatus.RUNNING);
+ public void startPipe(String pipeName, String remoteIp, long time) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.RUNNING, time);
}
- public void stopPipe(String pipeName, String remoteIp) throws IOException {
- writeLog(pipeName, remoteIp, PipeStatus.STOP);
+ public void stopPipe(String pipeName, String remoteIp, long time) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.STOP, time);
}
- public void dropPipe(String pipeName, String remoteIp) throws IOException {
- writeLog(pipeName, remoteIp, PipeStatus.DROP);
+ public void dropPipe(String pipeName, String remoteIp, long time) throws IOException {
+ writeLog(pipeName, remoteIp, PipeStatus.DROP, time);
}
public void writePipeMsg(String pipeIdentifier, PipeMessage pipeMessage) throws IOException {
@@ -100,16 +100,16 @@ public class ReceiverLog {
if (pipeServerWriter == null) {
init();
}
- pipeServerWriter.write(String.format("%s,%s,%s,%d", pipeName, remoteIp, status, time));
+ pipeServerWriter.write(String.format("%s,%s,%d,%s", pipeName, remoteIp, time, status));
pipeServerWriter.newLine();
pipeServerWriter.flush();
}
- private void writeLog(String pipeName, String remoteIp, PipeStatus status) throws IOException {
+ private void writeLog(String pipeName, String remoteIp, long time) throws IOException {
if (pipeServerWriter == null) {
init();
}
- pipeServerWriter.write(String.format("%s,%s,%s", pipeName, remoteIp, status));
+ pipeServerWriter.write(String.format("%s,%s,%d", pipeName, remoteIp, time));
pipeServerWriter.newLine();
pipeServerWriter.flush();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
index 02dc2229fe..3442c1154c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzer.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
-import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
@@ -43,12 +42,14 @@ public class ReceiverLogAnalyzer {
private static final Logger logger = LoggerFactory.getLogger(ReceiverLogAnalyzer.class);
// record recovery result of receiver server status
private boolean pipeServerEnable = false;
- private Map<String, Map<String, PipeInfo>> pipeInfoMap = new ConcurrentHashMap<>();
+ // <pipeName, <remoteIp, <createTime, status>>>
+ private Map<String, Map<String, Map<Long, PipeStatus>>> pipeInfos = new ConcurrentHashMap<>();
+ // <pipeFolderName, pipeMsg>
private Map<String, List<PipeMessage>> pipeMessageMap = new ConcurrentHashMap<>();
public void scan() throws StartupException {
logger.info("Start to recover all sync state for sync receiver.");
- pipeInfoMap = new ConcurrentHashMap<>();
+ pipeInfos = new ConcurrentHashMap<>();
pipeMessageMap = new ConcurrentHashMap<>();
pipeServerEnable = false;
File serviceLogFile = new File(SyncPathUtil.getSysDir(), SyncConstant.RECEIVER_LOG_NAME);
@@ -96,8 +97,8 @@ public class ReceiverLogAnalyzer {
return pipeServerEnable;
}
- public Map<String, Map<String, PipeInfo>> getPipeInfoMap() {
- return pipeInfoMap;
+ public Map<String, Map<String, Map<Long, PipeStatus>>> getPipeInfos() {
+ return pipeInfos;
}
public Map<String, List<PipeMessage>> getPipeMessageMap() {
@@ -118,21 +119,16 @@ public class ReceiverLogAnalyzer {
String[] items = logLine.split(",");
String pipeName = items[0];
String remoteIp = items[1];
- PipeStatus status = PipeStatus.valueOf(items[2]);
- if (status == PipeStatus.RUNNING) {
- if (!pipeInfoMap.containsKey(pipeName)) {
- pipeInfoMap.put(pipeName, new HashMap<>());
- }
- if (items.length == 4) {
- // create
- pipeInfoMap
- .get(pipeName)
- .put(remoteIp, new PipeInfo(pipeName, remoteIp, status, Long.parseLong(items[3])));
- } else {
- pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
- }
+ long createTime = Long.parseLong(items[2]);
+ if (items.length == 4) {
+ // start、stop、drop
+ PipeStatus status = PipeStatus.valueOf(items[3]);
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, status);
} else {
- pipeInfoMap.get(pipeName).get(remoteIp).setStatus(status);
+ // create
+ pipeInfos.putIfAbsent(pipeName, new HashMap<>());
+ pipeInfos.get(pipeName).putIfAbsent(remoteIp, new HashMap<>());
+ pipeInfos.get(pipeName).get(remoteIp).put(createTime, PipeStatus.STOP);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
index d21fb2973f..b4861053ef 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
@@ -215,7 +215,6 @@ public class SenderService implements IService {
public synchronized void stopPipe(String pipeName) throws PipeException {
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == Pipe.PipeStatus.RUNNING) {
- sendMsg(RequestType.STOP);
runningPipe.stop();
transportHandler.stop();
}
@@ -225,7 +224,6 @@ public class SenderService implements IService {
public synchronized void startPipe(String pipeName) throws PipeException {
checkRunningPipeExistAndName(pipeName);
if (runningPipe.getStatus() == Pipe.PipeStatus.STOP) {
- sendMsg(RequestType.START);
runningPipe.start();
transportHandler.start();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
index 3869b5a4fc..cc5375d475 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/client/TransportClient.java
@@ -30,14 +30,7 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.service.SenderService;
import org.apache.iotdb.db.sync.transport.conf.TransportConstant;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.ResponseType;
-import org.apache.iotdb.service.transport.thrift.SyncRequest;
-import org.apache.iotdb.service.transport.thrift.SyncResponse;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.service.transport.thrift.*;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -299,7 +292,7 @@ public class TransportClient implements ITransportClient {
try {
status =
serviceClient.transportData(
- identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+ metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
} catch (TException e) {
// retry
logger.error("TException happened! ", e);
@@ -377,9 +370,7 @@ public class TransportClient implements ITransportClient {
file.getAbsoluteFile(), config.getMaxNumberOfSyncFileRetry()));
}
try {
- status =
- serviceClient.checkFileDigest(
- identityInfo, metaInfo, ByteBuffer.wrap(messageDigest.digest()));
+ status = serviceClient.checkFileDigest(metaInfo, ByteBuffer.wrap(messageDigest.digest()));
} catch (TException e) {
// retry
logger.error("TException happens! ", e);
@@ -420,7 +411,7 @@ public class TransportClient implements ITransportClient {
new MetaInfo(Type.findByValue(pipeData.getType().ordinal()), "fileName", 0);
TransportStatus status =
serviceClient.transportData(
- identityInfo, metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
+ metaInfo, buffToSend, ByteBuffer.wrap(messageDigest.digest()));
if (status.code == SUCCESS_CODE) {
break;
@@ -451,6 +442,14 @@ public class TransportClient implements ITransportClient {
throw new SyncConnectionException(
String.format("Handshake with receiver %s:%d error.", ipAddress, port));
}
+ SenderService.getInstance()
+ .receiveMsg(
+ heartbeat(
+ new SyncRequest(
+ RequestType.START,
+ pipe.getName(),
+ InetAddress.getLocalHost().getHostAddress(),
+ pipe.getCreateTime())));
while (!Thread.currentThread().isInterrupted()) {
PipeData pipeData = pipe.take();
if (!senderTransport(pipeData)) {
@@ -468,7 +467,7 @@ public class TransportClient implements ITransportClient {
}
} catch (InterruptedException e) {
logger.info("Interrupted by pipe, exit transport.");
- } catch (SyncConnectionException e) {
+ } catch (SyncConnectionException | UnknownHostException e) {
logger.error(
String.format("Connect to receiver %s:%d error, because %s.", ipAddress, port, e));
SenderService.getInstance()
@@ -509,7 +508,7 @@ public class TransportClient implements ITransportClient {
heartbeatTransport.open();
}
- return heartbeatClient.heartbeat(identityInfo, syncRequest);
+ return heartbeatClient.heartbeat(syncRequest);
} catch (TException e) {
logger.info(
String.format(
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
index f9e57bca69..c53702dace 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServiceImpl.java
@@ -28,13 +28,7 @@ import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.service.transport.thrift.IdentityInfo;
-import org.apache.iotdb.service.transport.thrift.MetaInfo;
-import org.apache.iotdb.service.transport.thrift.SyncRequest;
-import org.apache.iotdb.service.transport.thrift.SyncResponse;
-import org.apache.iotdb.service.transport.thrift.TransportService;
-import org.apache.iotdb.service.transport.thrift.TransportStatus;
-import org.apache.iotdb.service.transport.thrift.Type;
+import org.apache.iotdb.service.transport.thrift.*;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -70,6 +64,11 @@ public class TransportServiceImpl implements TransportService.Iface {
private IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
private static final String RECORD_SUFFIX = ".record";
private static final String PATCH_SUFFIX = ".patch";
+ private ThreadLocal<IdentityInfo> identityInfoThreadLocal;
+
+ public TransportServiceImpl() {
+ identityInfoThreadLocal = new ThreadLocal<>();
+ }
private class CheckResult {
boolean result;
@@ -135,6 +134,7 @@ public class TransportServiceImpl implements TransportService.Iface {
@Override
public TransportStatus handshake(IdentityInfo identityInfo) throws TException {
logger.debug("Invoke handshake method from client ip = {}", identityInfo.address);
+ identityInfoThreadLocal.set(identityInfo);
// check ip address
if (!verifyIPSegment(config.getIpWhiteList(), identityInfo.address)) {
return new TransportStatus(
@@ -150,8 +150,8 @@ public class TransportServiceImpl implements TransportService.Iface {
identityInfo.version, config.getIoTDBVersion()));
}
- if (!new File(getFileDataDirPath(identityInfo)).exists()) {
- new File(getFileDataDirPath(identityInfo)).mkdirs();
+ if (!new File(SyncPathUtil.getFileDataDirPath(identityInfo)).exists()) {
+ new File(SyncPathUtil.getFileDataDirPath(identityInfo)).mkdirs();
}
return new TransportStatus(SUCCESS_CODE, "");
}
@@ -197,11 +197,11 @@ public class TransportServiceImpl implements TransportService.Iface {
}
@Override
- public TransportStatus transportData(
- IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+ public TransportStatus transportData(MetaInfo metaInfo, ByteBuffer buff, ByteBuffer digest) {
+ IdentityInfo identityInfo = identityInfoThreadLocal.get();
logger.debug("Invoke transportData method from client ip = {}", identityInfo.address);
- String fileDir = getFileDataDirPath(identityInfo);
+ String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
Type type = metaInfo.type;
String fileName = metaInfo.fileName;
long startIndex = metaInfo.startIndex;
@@ -246,7 +246,7 @@ public class TransportServiceImpl implements TransportService.Iface {
// Do with file
handleTsFilePipeData((TsFilePipeData) pipeData, fileDir);
}
- PipeDataQueueFactory.getBufferedPipeDataQueue(getPipeLogDirPath(identityInfo))
+ PipeDataQueueFactory.getBufferedPipeDataQueue(SyncPathUtil.getPipeLogDirPath(identityInfo))
.offer(pipeData);
} catch (IOException | IllegalPathException e) {
logger.error("Pipe data transport error, {}", e.getMessage());
@@ -280,11 +280,10 @@ public class TransportServiceImpl implements TransportService.Iface {
}
@Override
- public TransportStatus checkFileDigest(
- IdentityInfo identityInfo, MetaInfo metaInfo, ByteBuffer digest) throws TException {
+ public TransportStatus checkFileDigest(MetaInfo metaInfo, ByteBuffer digest) throws TException {
+ IdentityInfo identityInfo = identityInfoThreadLocal.get();
logger.debug("Invoke checkFileDigest method from client ip = {}", identityInfo.address);
-
- String fileDir = getFileDataDirPath(identityInfo);
+ String fileDir = SyncPathUtil.getFileDataDirPath(identityInfo);
synchronized (fileDir.intern()) {
String fileName = metaInfo.fileName;
MessageDigest messageDigest = null;
@@ -326,8 +325,7 @@ public class TransportServiceImpl implements TransportService.Iface {
}
@Override
- public SyncResponse heartbeat(IdentityInfo identityInfo, SyncRequest syncRequest)
- throws TException {
+ public SyncResponse heartbeat(SyncRequest syncRequest) throws TException {
return ReceiverService.getInstance().receiveMsg(syncRequest);
}
@@ -343,8 +341,19 @@ public class TransportServiceImpl implements TransportService.Iface {
* release resources or cleanup when a client (a sender) is disconnected (normally or abnormally).
*/
public void handleClientExit() {
- // TODO: Handle client exit here.
- // do nothing now
+ // Handle client exit here.
+ IdentityInfo identityInfo = identityInfoThreadLocal.get();
+ if (identityInfo != null) {
+ // stop pipe
+ identityInfoThreadLocal.remove();
+ ReceiverService.getInstance()
+ .receiveMsg(
+ new SyncRequest(
+ RequestType.STOP,
+ identityInfo.getPipeName(),
+ identityInfo.getAddress(),
+ identityInfo.getCreateTime()));
+ }
}
/**
@@ -379,14 +388,4 @@ public class TransportServiceImpl implements TransportService.Iface {
String.format("Delete record file %s error, because %s.", recordFile.getPath(), e));
}
}
-
- private String getFileDataDirPath(IdentityInfo identityInfo) {
- return SyncPathUtil.getReceiverFileDataDir(
- identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
- }
-
- private String getPipeLogDirPath(IdentityInfo identityInfo) {
- return SyncPathUtil.getReceiverPipeLogDir(
- identityInfo.getPipeName(), identityInfo.getAddress(), identityInfo.getCreateTime());
- }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
index 2bc07dce99..eb4d2859cb 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
@@ -56,14 +56,14 @@ public class ReceiverManagerTest {
manager.createPipe(pipe1, ip1, 1);
manager.createPipe(pipe2, ip2, 2);
manager.createPipe(pipe1, ip2, 3);
- manager.stopPipe(pipe1, ip1);
- manager.stopPipe(pipe2, ip2);
- manager.dropPipe(pipe1, ip2);
- manager.startPipe(pipe1, ip1);
+ manager.stopPipe(pipe1, ip1, 1);
+ manager.stopPipe(pipe2, ip2, 2);
+ manager.dropPipe(pipe1, ip2, 3);
+ manager.startPipe(pipe1, ip1, 1);
List<PipeInfo> allPipeInfos = manager.getAllPipeInfos();
Assert.assertEquals(3, allPipeInfos.size());
- List<PipeInfo> pipeInfos1 = manager.getPipeInfos(pipe1);
- List<PipeInfo> pipeInfos2 = manager.getPipeInfos(pipe2);
+ List<PipeInfo> pipeInfos1 = manager.getPipeInfosByPipeName(pipe1);
+ List<PipeInfo> pipeInfos2 = manager.getPipeInfosByPipeName(pipe2);
Assert.assertEquals(2, pipeInfos1.size());
Assert.assertEquals(1, pipeInfos2.size());
for (PipeInfo pipeInfo : pipeInfos2) {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index 6ced92cdd4..b97e7686e2 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -20,7 +20,6 @@ package org.apache.iotdb.db.sync.receiver.recovery;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
-import org.apache.iotdb.db.sync.receiver.manager.PipeInfo;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.sender.pipe.Pipe.PipeStatus;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -62,24 +61,24 @@ public class ReceiverLogAnalyzerTest {
log.createPipe(pipe1, ip1, 1);
log.createPipe(pipe2, ip2, 2);
log.createPipe(pipe1, ip2, 3);
- log.stopPipe(pipe1, ip1);
+ log.stopPipe(pipe1, ip1, 1);
log.stopPipeServer();
log.startPipeServer();
- log.stopPipe(pipe2, ip2);
- log.dropPipe(pipe1, ip2);
- log.startPipe(pipe1, ip1);
+ log.stopPipe(pipe2, ip2, 2);
+ log.dropPipe(pipe1, ip2, 3);
+ log.startPipe(pipe1, ip1, 1);
log.close();
ReceiverLogAnalyzer receiverLogAnalyzer = new ReceiverLogAnalyzer();
receiverLogAnalyzer.scan();
- Map<String, Map<String, PipeInfo>> map = receiverLogAnalyzer.getPipeInfoMap();
+ Map<String, Map<String, Map<Long, PipeStatus>>> map = receiverLogAnalyzer.getPipeInfos();
Assert.assertTrue(receiverLogAnalyzer.isPipeServerEnable());
Assert.assertNotNull(map);
Assert.assertEquals(2, map.get(pipe1).size());
Assert.assertEquals(1, map.get(pipe2).size());
Assert.assertEquals(1, map.get(pipe2).size());
- Assert.assertEquals(new PipeInfo(pipe2, ip2, PipeStatus.STOP, 2), map.get(pipe2).get(ip2));
- Assert.assertEquals(new PipeInfo(pipe1, ip1, PipeStatus.RUNNING, 1), map.get(pipe1).get(ip1));
- Assert.assertEquals(new PipeInfo(pipe1, ip2, PipeStatus.DROP, 3), map.get(pipe1).get(ip2));
+ Assert.assertEquals(PipeStatus.STOP, map.get(pipe2).get(ip2).get(2L));
+ Assert.assertEquals(PipeStatus.RUNNING, map.get(pipe1).get(ip1).get(1L));
+ Assert.assertEquals(PipeStatus.DROP, map.get(pipe1).get(ip2).get(3L));
} catch (Exception e) {
Assert.fail();
e.printStackTrace();
diff --git a/thrift-sync/src/main/thrift/transport.thrift b/thrift-sync/src/main/thrift/transport.thrift
index 74a4512ae3..d88c1e1400 100644
--- a/thrift-sync/src/main/thrift/transport.thrift
+++ b/thrift-sync/src/main/thrift/transport.thrift
@@ -84,7 +84,7 @@ struct SyncResponse{
service TransportService{
TransportStatus handshake(IdentityInfo info);
- TransportStatus transportData(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary buff, 4:binary digest);
- TransportStatus checkFileDigest(1:IdentityInfo identityInfo, 2:MetaInfo metaInfo, 3:binary digest);
- SyncResponse heartbeat(1:IdentityInfo identityInfo, 2:SyncRequest syncRequest)
+ TransportStatus transportData(1:MetaInfo metaInfo, 2:binary buff, 3:binary digest);
+ TransportStatus checkFileDigest(1:MetaInfo metaInfo, 2:binary digest);
+ SyncResponse heartbeat(SyncRequest syncRequest)
}