You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ne...@apache.org on 2021/06/26 03:59:37 UTC
[iotdb] branch master updated: [IOTDB-1447] ClientPool is blocking
other nodes when one node fails (#3430)
This is an automated email from the ASF dual-hosted git repository.
neuyilan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 974a959 [IOTDB-1447] ClientPool is blocking other nodes when one node fails (#3430)
974a959 is described below
commit 974a95938410c11323aeb51dd2bfeaea14439735
Author: Jiang Tian <jt...@163.com>
AuthorDate: Sat Jun 26 11:59:10 2021 +0800
[IOTDB-1447] ClientPool is blocking other nodes when one node fails (#3430)
* fix synchronization key
* fix lock object
---
.../cluster/client/async/AsyncClientPool.java | 36 +++++++++++-----------
.../iotdb/cluster/client/sync/SyncClientPool.java | 8 ++---
.../cluster/server/member/MetaGroupMember.java | 14 ++++++++-
3 files changed, 35 insertions(+), 23 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..bf0370f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<AsyncClient> clientStack =
clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -123,12 +123,12 @@ public class AsyncClientPool {
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
try {
- this.wait(waitClientTimeutMS);
+ clientStack.wait(waitClientTimeutMS);
if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
logger.warn(
- "Cannot get an available client after {}ms, create a new one.",
- waitClientTimeutMS,
- asyncClientFactory);
+ "{} Cannot get an available client after {}ms, create a new one.",
+ asyncClientFactory,
+ waitClientTimeutMS);
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
return asyncClient;
@@ -159,21 +159,21 @@ public class AsyncClientPool {
if (call != null) {
logger.warn("A using client {} is put back while running {}", client.hashCode(), call);
}
- synchronized (this) {
- // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
clientStack.push(client);
- this.notifyAll();
+ clientStack.notifyAll();
}
}
void onError(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
// clean all cached clients when network fails
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
while (!clientStack.isEmpty()) {
AsyncClient client = clientStack.pop();
if (client instanceof AsyncDataClient) {
@@ -183,7 +183,7 @@ public class AsyncClientPool {
}
}
nodeClientNumMap.put(clusterNode, 0);
- this.notifyAll();
+ clientStack.notifyAll();
NodeStatusManager.getINSTANCE().deactivate(node);
}
}
@@ -195,9 +195,9 @@ public class AsyncClientPool {
void recreateClient(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
try {
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
clientStack.push(asyncClient);
@@ -205,7 +205,7 @@ public class AsyncClientPool {
logger.error("Cannot create a new client for {}", node, e);
nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..2c279c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -113,7 +113,7 @@ public class SyncClientPool {
long waitStart = System.currentTimeMillis();
while (clientStack.isEmpty()) {
try {
- this.wait(waitClientTimeoutMS);
+ clientStack.wait(waitClientTimeoutMS);
if (clientStack.isEmpty()
&& System.currentTimeMillis() - waitStart >= waitClientTimeoutMS) {
logger.warn(
@@ -144,7 +144,7 @@ public class SyncClientPool {
ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
clientStack.push(client);
NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
NodeStatusManager.getINSTANCE().deactivate(node);
}
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 0224c29..21dfd68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1001,6 +1001,11 @@ public class MetaGroupMember extends RaftMember {
consistentNum.set(1);
inconsistentNum.set(0);
checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+ logger.debug(
+ "Status check result: {}-{}/{}",
+ consistentNum.get(),
+ inconsistentNum.get(),
+ getAllNodes().size());
canEstablishCluster =
analyseStartUpCheckResult(
consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1030,7 +1035,14 @@ public class MetaGroupMember extends RaftMember {
}
pool.submit(
() -> {
- CheckStatusResponse response = checkStatus(seedNode);
+ logger.debug("Checking status with {}", seedNode);
+ CheckStatusResponse response = null;
+ try {
+ response = checkStatus(seedNode);
+ } catch (Exception e) {
+ logger.warn("Exception during status check", e);
+ }
+ logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
if (response != null) {
// check the response
ClusterUtils.examineCheckStatusResponse(