You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/21 13:24:46 UTC

[iotdb] 01/01: fix synchronization key

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch jira_1447_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c069fdd2af3d1c5a2a1a6ed365e9d6ed88fd9d2e
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 21 21:23:48 2021 +0800

    fix synchronization key
---
 .../cluster/client/async/AsyncClientPool.java      | 22 +++++++++++-----------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  6 +++---
 .../cluster/server/member/MetaGroupMember.java     | 14 +++++++++++++-
 3 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..8fc46f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<AsyncClient> clientStack =
         clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -126,9 +126,9 @@ public class AsyncClientPool {
         this.wait(waitClientTimeutMS);
         if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
           logger.warn(
-              "Cannot get an available client after {}ms, create a new one.",
-              waitClientTimeutMS,
-              asyncClientFactory);
+              "{} Cannot get an available client after {}ms, create a new one.",
+              asyncClientFactory,
+              waitClientTimeutMS);
           AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
           return asyncClient;
@@ -171,9 +171,9 @@ public class AsyncClientPool {
   void onError(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
     // clean all cached clients when network fails
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       while (!clientStack.isEmpty()) {
         AsyncClient client = clientStack.pop();
         if (client instanceof AsyncDataClient) {
@@ -195,9 +195,9 @@ public class AsyncClientPool {
 
   void recreateClient(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       try {
         AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
         clientStack.push(asyncClient);
@@ -205,7 +205,7 @@ public class AsyncClientPool {
         logger.error("Cannot create a new client for {}", node, e);
         nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..0c4e3f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
 
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -144,7 +144,7 @@ public class SyncClientPool {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
         NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index daa1aa9..c477d5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1003,6 +1003,11 @@ public class MetaGroupMember extends RaftMember {
       consistentNum.set(1);
       inconsistentNum.set(0);
       checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+      logger.debug(
+          "Status check result: {}-{}/{}",
+          consistentNum.get(),
+          inconsistentNum.get(),
+          getAllNodes().size());
       canEstablishCluster =
           analyseStartUpCheckResult(
               consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1032,7 +1037,14 @@ public class MetaGroupMember extends RaftMember {
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
+            logger.debug("Checking status with {}", seedNode);
+            CheckStatusResponse response = null;
+            try {
+              response = checkStatus(seedNode);
+            } catch (Exception e) {
+              logger.warn("Exception during status check", e);
+            }
+            logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
             if (response != null) {
               // check the response
               ClusterUtils.examineCheckStatusResponse(