You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/21 13:24:46 UTC
[iotdb] 01/01: fix synchronization key
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch jira_1447_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c069fdd2af3d1c5a2a1a6ed365e9d6ed88fd9d2e
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 21 21:23:48 2021 +0800
fix synchronization key
---
.../cluster/client/async/AsyncClientPool.java | 22 +++++++++++-----------
.../iotdb/cluster/client/sync/SyncClientPool.java | 6 +++---
.../cluster/server/member/MetaGroupMember.java | 14 +++++++++++++-
3 files changed, 27 insertions(+), 15 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..8fc46f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<AsyncClient> clientStack =
clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -126,9 +126,9 @@ public class AsyncClientPool {
this.wait(waitClientTimeutMS);
if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
logger.warn(
- "Cannot get an available client after {}ms, create a new one.",
- waitClientTimeutMS,
- asyncClientFactory);
+ "{} Cannot get an available client after {}ms, create a new one.",
+ asyncClientFactory,
+ waitClientTimeutMS);
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
return asyncClient;
@@ -171,9 +171,9 @@ public class AsyncClientPool {
void onError(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
// clean all cached clients when network fails
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
while (!clientStack.isEmpty()) {
AsyncClient client = clientStack.pop();
if (client instanceof AsyncDataClient) {
@@ -195,9 +195,9 @@ public class AsyncClientPool {
void recreateClient(Node node) {
ClusterNode clusterNode = new ClusterNode(node);
- synchronized (this) {
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ Deque<AsyncClient> clientStack =
+ clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+ synchronized (clientStack) {
try {
AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
clientStack.push(asyncClient);
@@ -205,7 +205,7 @@ public class AsyncClientPool {
logger.error("Cannot create a new client for {}", node, e);
nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..0c4e3f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (clientStack.isEmpty()) {
int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
if (nodeClientNum >= maxConnectionForEachNode) {
@@ -144,7 +144,7 @@ public class SyncClientPool {
ClusterNode clusterNode = new ClusterNode(node);
// As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (this) {
+ synchronized (clientStack) {
if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
clientStack.push(client);
NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
NodeStatusManager.getINSTANCE().deactivate(node);
}
}
- this.notifyAll();
+ clientStack.notifyAll();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index daa1aa9..c477d5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1003,6 +1003,11 @@ public class MetaGroupMember extends RaftMember {
consistentNum.set(1);
inconsistentNum.set(0);
checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+ logger.debug(
+ "Status check result: {}-{}/{}",
+ consistentNum.get(),
+ inconsistentNum.get(),
+ getAllNodes().size());
canEstablishCluster =
analyseStartUpCheckResult(
consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1032,7 +1037,14 @@ public class MetaGroupMember extends RaftMember {
}
pool.submit(
() -> {
- CheckStatusResponse response = checkStatus(seedNode);
+ logger.debug("Checking status with {}", seedNode);
+ CheckStatusResponse response = null;
+ try {
+ response = checkStatus(seedNode);
+ } catch (Exception e) {
+ logger.warn("Exception during status check", e);
+ }
+ logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
if (response != null) {
// check the response
ClusterUtils.examineCheckStatusResponse(