You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/08/16 14:33:09 UTC
[iotdb] branch rel/0.12 updated: Fix issues of blocking heartbeat
broadcast and vote requesting caused by client reconnection (#3751) (#3757)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new c8455e5 Fix issues of blocking heartbeat broadcast and vote requesting caused by client reconnection (#3751) (#3757)
c8455e5 is described below
commit c8455e518908d027d93fe9362e74dc2c10a36d8f
Author: BaiJian <er...@hotmail.com>
AuthorDate: Mon Aug 16 22:32:42 2021 +0800
Fix issues of blocking heartbeat broadcast and vote requesting caused by client reconnection (#3751) (#3757)
---
.../cluster/server/heartbeat/HeartbeatThread.java | 34 +++++++++++-----------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index 67acc5f..d113e5d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -195,7 +195,6 @@ public class HeartbeatThread implements Runnable {
}
void sendHeartbeatSync(Node node) {
- Client client = localMember.getSyncHeartbeatClient(node);
HeartbeatHandler heartbeatHandler = new HeartbeatHandler(localMember, node);
HeartBeatRequest req = new HeartBeatRequest();
req.setCommitLogTerm(request.commitLogTerm);
@@ -211,11 +210,12 @@ public class HeartbeatThread implements Runnable {
req.partitionTableBytes = request.partitionTableBytes;
req.setPartitionTableBytesIsSet(true);
}
- if (client != null) {
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
try {
logger.debug("{}: Sending heartbeat to {}", memberName, node);
HeartBeatResponse heartBeatResponse = client.sendHeartbeat(req);
@@ -230,8 +230,8 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
/**
@@ -393,13 +393,13 @@ public class HeartbeatThread implements Runnable {
}
private void requestVoteSync(Node node, ElectionHandler handler, ElectionRequest request) {
- Client client = localMember.getSyncHeartbeatClient(node);
- if (client != null) {
- logger.info("{}: Requesting a vote from {}", memberName, node);
- localMember
- .getSerialToParallelPool()
- .submit(
- () -> {
+ localMember
+ .getSerialToParallelPool()
+ .submit(
+ () -> {
+ Client client = localMember.getSyncHeartbeatClient(node);
+ if (client != null) {
+ logger.info("{}: Requesting a vote from {}", memberName, node);
try {
long result = client.startElection(request);
handler.onComplete(result);
@@ -413,7 +413,7 @@ public class HeartbeatThread implements Runnable {
} finally {
ClientUtils.putBackSyncHeartbeatClient(client);
}
- });
- }
+ }
+ });
}
}