You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/26 08:16:19 UTC
[iotdb] branch master updated: [IOTDB-3003] data sync pipe recover bug & other issues (#5665)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 718305dfcc [IOTDB-3003] data sync pipe recover bug & other issues (#5665)
718305dfcc is described below
commit 718305dfccdf40e9b4997c6228148cac819ad440
Author: Jamber <ja...@sina.com>
AuthorDate: Tue Apr 26 16:16:12 2022 +0800
[IOTDB-3003] data sync pipe recover bug & other issues (#5665)
Co-authored-by: haiyi.zb <ha...@alibaba-inc.com>
---
.../sync/pipedata/queue/BufferedPipeDataQueue.java | 3 +++
.../db/sync/receiver/manager/ReceiverManager.java | 28 ++++++++++------------
.../sync/receiver/manager/ReceiverManagerTest.java | 2 +-
.../receiver/recovery/ReceiverLogAnalyzerTest.java | 2 +-
4 files changed, 18 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index c6e7830613..88dcba6bd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -329,6 +329,9 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
commitSerialNumber += 1;
try {
PipeData commitData = pullOnePipeData(commitSerialNumber);
+ if (commitData == null) {
+ continue;
+ }
if (PipeData.PipeDataType.TSFILE.equals(commitData.getType())) {
List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false);
for (File file : tsFiles) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 7abbfed8a3..0ad998ea62 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -93,30 +93,28 @@ public class ReceiverManager {
public List<PipeInfo> getPipeInfosByPipeName(String pipeName) {
if (!pipeInfos.containsKey(pipeName)) {
return Collections.emptyList();
- } else {
- List<PipeInfo> res = new ArrayList<>();
- for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
- pipeInfos.get(pipeName).entrySet()) {
- for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
- res.add(
- new PipeInfo(
- pipeName,
- remoteIpEntry.getKey(),
- createTimeEntry.getValue(),
- createTimeEntry.getKey()));
- }
+ }
+ List<PipeInfo> res = new ArrayList<>();
+ for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
+ pipeInfos.get(pipeName).entrySet()) {
+ for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
+ res.add(
+ new PipeInfo(
+ pipeName,
+ remoteIpEntry.getKey(),
+ createTimeEntry.getValue(),
+ createTimeEntry.getKey()));
}
- return res;
}
+ return res;
}
public PipeInfo getPipeInfo(String pipeName, String remoteIp, long createTime) {
if (pipeInfos.containsKey(pipeName) && pipeInfos.get(pipeName).containsKey(remoteIp)) {
return new PipeInfo(
pipeName, remoteIp, pipeInfos.get(pipeName).get(remoteIp).get(createTime), createTime);
- } else {
- return null;
}
+ return null;
}
public List<PipeInfo> getAllPipeInfos() {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
index eb4d2859cb..e4df73aa46 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
@@ -91,8 +91,8 @@ public class ReceiverManagerTest {
Assert.assertEquals(error, messages.get(0));
manager.close();
} catch (Exception e) {
- Assert.fail();
e.printStackTrace();
+ Assert.fail();
}
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index b97e7686e2..ac41da744e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -116,8 +116,8 @@ public class ReceiverLogAnalyzerTest {
Assert.assertEquals(info, map.get(pipeIdentifier1).get(1));
Assert.assertEquals(warn, map.get(pipeIdentifier1).get(2));
} catch (Exception e) {
- Assert.fail();
e.printStackTrace();
+ Assert.fail();
}
}
}