You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/08/16 14:33:09 UTC

[iotdb] branch rel/0.12 updated: Fix issues of blocking heartbeat broadcast and vote requesting caused by client reconnection (#3751) (#3757)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new c8455e5  Fix issues of blocking heartbeat broadcast and vote requesting caused by client reconnection (#3751) (#3757)
c8455e5 is described below

commit c8455e518908d027d93fe9362e74dc2c10a36d8f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Mon Aug 16 22:32:42 2021 +0800

    Fix issues of blocking heartbeat broadcast and vote requesting caused by client reconnection (#3751) (#3757)
---
 .../cluster/server/heartbeat/HeartbeatThread.java  | 34 +++++++++++-----------
 1 file changed, 17 insertions(+), 17 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 67acc5f..d113e5d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -195,7 +195,6 @@ public class HeartbeatThread implements Runnable {
   }
 
   void sendHeartbeatSync(Node node) {
-    Client client = localMember.getSyncHeartbeatClient(node);
     HeartbeatHandler heartbeatHandler = new HeartbeatHandler(localMember, node);
     HeartBeatRequest req = new HeartBeatRequest();
     req.setCommitLogTerm(request.commitLogTerm);
@@ -211,11 +210,12 @@ public class HeartbeatThread implements Runnable {
       req.partitionTableBytes = request.partitionTableBytes;
       req.setPartitionTableBytesIsSet(true);
     }
-    if (client != null) {
-      localMember
-          .getSerialToParallelPool()
-          .submit(
-              () -> {
+    localMember
+        .getSerialToParallelPool()
+        .submit(
+            () -> {
+              Client client = localMember.getSyncHeartbeatClient(node);
+              if (client != null) {
                 try {
                   logger.debug("{}: Sending heartbeat to {}", memberName, node);
                   HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req);
@@ -230,8 +230,8 @@ public class HeartbeatThread implements Runnable {
                 } finally {
                   ClientUtils.putBackSyncHeartbeatClient(client);
                 }
-              });
-    }
+              }
+            });
   }
 
   /**
@@ -393,13 +393,13 @@ public class HeartbeatThread implements Runnable {
   }
 
   private void requestVoteSync(Node node, ElectionHandler handler, ElectionRequest request) {
-    Client client = localMember.getSyncHeartbeatClient(node);
-    if (client != null) {
-      logger.info("{}: Requesting a vote from {}", memberName, node);
-      localMember
-          .getSerialToParallelPool()
-          .submit(
-              () -> {
+    localMember
+        .getSerialToParallelPool()
+        .submit(
+            () -> {
+              Client client = localMember.getSyncHeartbeatClient(node);
+              if (client != null) {
+                logger.info("{}: Requesting a vote from {}", memberName, node);
                 try {
                   long result = client.startElection(request);
                   handler.onComplete(result);
@@ -413,7 +413,7 @@ public class HeartbeatThread implements Runnable {
                 } finally {
                   ClientUtils.putBackSyncHeartbeatClient(client);
                 }
-              });
-    }
+              }
+            });
   }
 }