You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2021/06/21 13:24:45 UTC

[iotdb] branch jira_1447_0.12 created (now c069fdd)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch jira_1447_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at c069fdd  fix synchronization key

This branch includes the following new commits:

     new c069fdd  fix synchronization key

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: fix synchronization key

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch jira_1447_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit c069fdd2af3d1c5a2a1a6ed365e9d6ed88fd9d2e
Author: jt2594838 <jt...@163.com>
AuthorDate: Mon Jun 21 21:23:48 2021 +0800

    fix synchronization key
---
 .../cluster/client/async/AsyncClientPool.java      | 22 +++++++++++-----------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  6 +++---
 .../cluster/server/member/MetaGroupMember.java     | 14 +++++++++++++-
 3 files changed, 27 insertions(+), 15 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..8fc46f5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<AsyncClient> clientStack =
         clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -126,9 +126,9 @@ public class AsyncClientPool {
         this.wait(waitClientTimeutMS);
         if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
           logger.warn(
-              "Cannot get an available client after {}ms, create a new one.",
-              waitClientTimeutMS,
-              asyncClientFactory);
+              "{} Cannot get an available client after {}ms, create a new one.",
+              asyncClientFactory,
+              waitClientTimeutMS);
           AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
           return asyncClient;
@@ -171,9 +171,9 @@ public class AsyncClientPool {
   void onError(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
     // clean all cached clients when network fails
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       while (!clientStack.isEmpty()) {
         AsyncClient client = clientStack.pop();
         if (client instanceof AsyncDataClient) {
@@ -195,9 +195,9 @@ public class AsyncClientPool {
 
   void recreateClient(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       try {
         AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
         clientStack.push(asyncClient);
@@ -205,7 +205,7 @@ public class AsyncClientPool {
         logger.error("Cannot create a new client for {}", node, e);
         nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..0c4e3f1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
 
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -144,7 +144,7 @@ public class SyncClientPool {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
         NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index daa1aa9..c477d5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1003,6 +1003,11 @@ public class MetaGroupMember extends RaftMember {
       consistentNum.set(1);
       inconsistentNum.set(0);
       checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+      logger.debug(
+          "Status check result: {}-{}/{}",
+          consistentNum.get(),
+          inconsistentNum.get(),
+          getAllNodes().size());
       canEstablishCluster =
           analyseStartUpCheckResult(
               consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1032,7 +1037,14 @@ public class MetaGroupMember extends RaftMember {
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
+            logger.debug("Checking status with {}", seedNode);
+            CheckStatusResponse response = null;
+            try {
+              response = checkStatus(seedNode);
+            } catch (Exception e) {
+              logger.warn("Exception during status check", e);
+            }
+            logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
             if (response != null) {
               // check the response
               ClusterUtils.examineCheckStatusResponse(