You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/04/26 08:16:19 UTC

[iotdb] branch master updated: [IOTDB-3003] data sync pipe recover bug & other issues (#5665)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 718305dfcc [IOTDB-3003] data sync pipe recover bug & other issues (#5665)
718305dfcc is described below

commit 718305dfccdf40e9b4997c6228148cac819ad440
Author: Jamber <ja...@sina.com>
AuthorDate: Tue Apr 26 16:16:12 2022 +0800

    [IOTDB-3003] data sync pipe recover bug & other issues (#5665)
    
    Co-authored-by: haiyi.zb <ha...@alibaba-inc.com>
---
 .../sync/pipedata/queue/BufferedPipeDataQueue.java |  3 +++
 .../db/sync/receiver/manager/ReceiverManager.java  | 28 ++++++++++------------
 .../sync/receiver/manager/ReceiverManagerTest.java |  2 +-
 .../receiver/recovery/ReceiverLogAnalyzerTest.java |  2 +-
 4 files changed, 18 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
index c6e7830613..88dcba6bd0 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/pipedata/queue/BufferedPipeDataQueue.java
@@ -329,6 +329,9 @@ public class BufferedPipeDataQueue implements PipeDataQueue {
       commitSerialNumber += 1;
       try {
         PipeData commitData = pullOnePipeData(commitSerialNumber);
+        if (commitData == null) {
+          continue;
+        }
         if (PipeData.PipeDataType.TSFILE.equals(commitData.getType())) {
           List<File> tsFiles = ((TsFilePipeData) commitData).getTsFiles(false);
           for (File file : tsFiles) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
index 7abbfed8a3..0ad998ea62 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManager.java
@@ -93,30 +93,28 @@ public class ReceiverManager {
   public List<PipeInfo> getPipeInfosByPipeName(String pipeName) {
     if (!pipeInfos.containsKey(pipeName)) {
       return Collections.emptyList();
-    } else {
-      List<PipeInfo> res = new ArrayList<>();
-      for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
-          pipeInfos.get(pipeName).entrySet()) {
-        for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
-          res.add(
-              new PipeInfo(
-                  pipeName,
-                  remoteIpEntry.getKey(),
-                  createTimeEntry.getValue(),
-                  createTimeEntry.getKey()));
-        }
+    }
+    List<PipeInfo> res = new ArrayList<>();
+    for (Map.Entry<String, Map<Long, PipeStatus>> remoteIpEntry :
+        pipeInfos.get(pipeName).entrySet()) {
+      for (Map.Entry<Long, PipeStatus> createTimeEntry : remoteIpEntry.getValue().entrySet()) {
+        res.add(
+            new PipeInfo(
+                pipeName,
+                remoteIpEntry.getKey(),
+                createTimeEntry.getValue(),
+                createTimeEntry.getKey()));
       }
-      return res;
     }
+    return res;
   }
 
   public PipeInfo getPipeInfo(String pipeName, String remoteIp, long createTime) {
     if (pipeInfos.containsKey(pipeName) && pipeInfos.get(pipeName).containsKey(remoteIp)) {
       return new PipeInfo(
           pipeName, remoteIp, pipeInfos.get(pipeName).get(remoteIp).get(createTime), createTime);
-    } else {
-      return null;
     }
+    return null;
   }
 
   public List<PipeInfo> getAllPipeInfos() {
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
index eb4d2859cb..e4df73aa46 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/manager/ReceiverManagerTest.java
@@ -91,8 +91,8 @@ public class ReceiverManagerTest {
       Assert.assertEquals(error, messages.get(0));
       manager.close();
     } catch (Exception e) {
-      Assert.fail();
       e.printStackTrace();
+      Assert.fail();
     }
   }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
index b97e7686e2..ac41da744e 100644
--- a/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/sync/receiver/recovery/ReceiverLogAnalyzerTest.java
@@ -116,8 +116,8 @@ public class ReceiverLogAnalyzerTest {
       Assert.assertEquals(info, map.get(pipeIdentifier1).get(1));
       Assert.assertEquals(warn, map.get(pipeIdentifier1).get(2));
     } catch (Exception e) {
-      Assert.fail();
       e.printStackTrace();
+      Assert.fail();
     }
   }
 }