You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/22 01:49:56 UTC

[iotdb] branch jira_1447 created (now 12f7dd6)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch jira_1447
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 12f7dd6  fix synchronization key

This branch includes the following new commits:

     new 12f7dd6  fix synchronization key

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix synchronization key

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch jira_1447
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 12f7dd6eb7d962eb7ab4de7e5990d80cfb18db6f
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 21 21:23:48 2021 +0800

    fix synchronization key
---
 .../cluster/client/async/AsyncClientPool.java      | 22 +++++++++++-----------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  6 +++---
 .../cluster/server/member/MetaGroupMember.java     | 14 +++++++++++++-
 3 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..8fc46f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<AsyncClient> clientStack =
         clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -126,9 +126,9 @@ public class AsyncClientPool {
         this.wait(waitClientTimeutMS);
         if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
           logger.warn(
-              "Cannot get an available client after {}ms, create a new one.",
-              waitClientTimeutMS,
-              asyncClientFactory);
+              "{} Cannot get an available client after {}ms, create a new one.",
+              asyncClientFactory,
+              waitClientTimeutMS);
           AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
           return asyncClient;
@@ -171,9 +171,9 @@ public class AsyncClientPool {
   void onError(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
     // clean all cached clients when network fails
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       while (!clientStack.isEmpty()) {
         AsyncClient client = clientStack.pop();
         if (client instanceof AsyncDataClient) {
@@ -195,9 +195,9 @@ public class AsyncClientPool {
 
   void recreateClient(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       try {
         AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
         clientStack.push(asyncClient);
@@ -205,7 +205,7 @@ public class AsyncClientPool {
         logger.error("Cannot create a new client for {}", node, e);
         nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..0c4e3f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
 
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -144,7 +144,7 @@ public class SyncClientPool {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
         NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 0224c29..21dfd68 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1001,6 +1001,11 @@ public class MetaGroupMember extends RaftMember {
       consistentNum.set(1);
       inconsistentNum.set(0);
       checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+      logger.debug(
+          "Status check result: {}-{}/{}",
+          consistentNum.get(),
+          inconsistentNum.get(),
+          getAllNodes().size());
       canEstablishCluster =
           analyseStartUpCheckResult(
               consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1030,7 +1035,14 @@ public class MetaGroupMember extends RaftMember {
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
+            logger.debug("Checking status with {}", seedNode);
+            CheckStatusResponse response = null;
+            try {
+              response = checkStatus(seedNode);
+            } catch (Exception e) {
+              logger.warn("Exception during status check", e);
+            }
+            logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
             if (response != null) {
               // check the response
               ClusterUtils.examineCheckStatusResponse(