You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/23 02:38:49 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-1447]
ClientPool is blocking other nodes when one node fails (#3429)
This is an automated email from the ASF dual-hosted git repository.
chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 47b864f [To rel/0.12][IOTDB-1447] ClientPool is blocking other nodes when one node fails (#3429)
47b864f is described below
commit 47b864fd487fb9187577ddc9853655a89886eba2
Author: Jiang Tian <jt...@163.com>
AuthorDate: Wed Jun 23 10:38:27 2021 +0800
[To rel/0.12][IOTDB-1447] ClientPool is blocking other nodes when one node fails (#3429)
---
.../cluster/client/async/AsyncClientPool.java | 35 +++++++++++-----------
.../iotdb/cluster/client/sync/SyncClientPool.java | 8 ++---
.../cluster/server/member/MetaGroupMember.java | 14 ++++++++-
3 files changed, 35 insertions(+), 22 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..4ba3fe9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<AsyncClient> clientStack =
clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -123,12 +123,12 @@ public class AsyncClientPool {
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
try {
- this.wait(waitClientTimeutMS);
+ clientStack.wait(waitClientTimeutMS);
if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
logger.warn(
- "Cannot get an available client after {}ms, create a new one.",
- waitClientTimeutMS,
- asyncClientFactory);
+ "{} Cannot get an available client after {}ms, create a new one.",
+ asyncClientFactory,
+ waitClientTimeutMS);
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
return asyncClient;
@@ -159,21 +159,22 @@ public class AsyncClientPool {
if (call != null) {
logger.warn("A using client {} is put back while running {}", client.hashCode(), call);
}
- synchronized (this) {
+
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
clientStack.push(client);
- this.notifyAll();
+ clientStack.notifyAll();
}
}
void onError(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
// clean all cached clients when network fails
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
while (!clientStack.isEmpty()) {
AsyncClient client = clientStack.pop();
if (client instanceof AsyncDataClient) {
@@ -183,7 +184,7 @@ public class AsyncClientPool {
}
}
nodeClientNumMap.put(clusterNode, 0);
- this.notifyAll();
+ clientStack.notifyAll();
NodeStatusManager.getINSTANCE().deactivate(node);
}
}
@@ -195,9 +196,9 @@ public class AsyncClientPool {
void recreateClient(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
try {
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
clientStack.push(asyncClient);
@@ -205,7 +206,7 @@ public class AsyncClientPool {
logger.error("Cannot create a new client for {}", node, e);
nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..2c279c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -113,7 +113,7 @@ public class SyncClientPool {
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
try {
- this.wait(waitClientTimeoutMS);
+ clientStack.wait(waitClientTimeoutMS);
if (clientStack.isEmpty()
&& System.currentTimeMillis() - waitStart >= waitClientTimeoutMS) {
logger.warn(
@@ -144,7 +144,7 @@ public class SyncClientPool {
ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
clientStack.push(client);
NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
NodeStatusManager.getINSTANCE().deactivate(node);
}
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index daa1aa9..c477d5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1003,6 +1003,11 @@ public class MetaGroupMember extends RaftMember {
consistentNum.set(1);
inconsistentNum.set(0);
checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+ logger.debug(
+ "Status check result: {}-{}/{}",
+ consistentNum.get(),
+ inconsistentNum.get(),
+ getAllNodes().size());
canEstablishCluster =
analyseStartUpCheckResult(
consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1032,7 +1037,14 @@ public class MetaGroupMember extends RaftMember {
}
pool.submit(
() -> {
- CheckStatusResponse response = checkStatus(seedNode);
+ logger.debug("Checking status with {}", seedNode);
+ CheckStatusResponse response = null;
+ try {
+ response = checkStatus(seedNode);
+ } catch (Exception e) {
+ logger.warn("Exception during status check", e);
+ }
+ logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
if (response != null) {
// check the response
ClusterUtils.examineCheckStatusResponse(