You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ch...@apache.org on 2021/06/23 02:38:49 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-1447] ClientPool is blocking other nodes when one node fails (#3429)

This is an automated email from the ASF dual-hosted git repository.

chaow pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 47b864f  [To rel/0.12][IOTDB-1447] ClientPool is blocking other nodes when one node fails	 (#3429)
47b864f is described below

commit 47b864fd487fb9187577ddc9853655a89886eba2
Author: Jiang Tian <jt...@163.com>
AuthorDate: Wed Jun 23 10:38:27 2021 +0800

    [To rel/0.12][IOTDB-1447] ClientPool is blocking other nodes when one node fails	 (#3429)
---
 .../cluster/client/async/AsyncClientPool.java      | 35 +++++++++++-----------
 .../iotdb/cluster/client/sync/SyncClientPool.java  |  8 ++---
 .../cluster/server/member/MetaGroupMember.java     | 14 ++++++++-
 3 files changed, 35 insertions(+), 22 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index a7441e4..4ba3fe9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -85,7 +85,7 @@ public class AsyncClientPool {
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<AsyncClient> clientStack =
         clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -123,12 +123,12 @@ public class AsyncClientPool {
     long waitStart = System.currentTimeMillis();
     while (clientStack.isEmpty()) {
       try {
-        this.wait(waitClientTimeutMS);
+        clientStack.wait(waitClientTimeutMS);
         if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
           logger.warn(
-              "Cannot get an available client after {}ms, create a new one.",
-              waitClientTimeutMS,
-              asyncClientFactory);
+              "{} Cannot get an available client after {}ms, create a new one.",
+              asyncClientFactory,
+              waitClientTimeutMS);
           AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
           return asyncClient;
@@ -159,21 +159,22 @@ public class AsyncClientPool {
     if (call != null) {
       logger.warn("A using client {} is put back while running {}", client.hashCode(), call);
     }
-    synchronized (this) {
+
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
       clientStack.push(client);
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
   void onError(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
     // clean all cached clients when network fails
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       while (!clientStack.isEmpty()) {
         AsyncClient client = clientStack.pop();
         if (client instanceof AsyncDataClient) {
@@ -183,7 +184,7 @@ public class AsyncClientPool {
         }
       }
       nodeClientNumMap.put(clusterNode, 0);
-      this.notifyAll();
+      clientStack.notifyAll();
       NodeStatusManager.getINSTANCE().deactivate(node);
     }
   }
@@ -195,9 +196,9 @@ public class AsyncClientPool {
 
   void recreateClient(Node node) {
     ClusterNode clusterNode = new ClusterNode(node);
-    synchronized (this) {
-      Deque<AsyncClient> clientStack =
-          clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<AsyncClient> clientStack =
+        clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    synchronized (clientStack) {
       try {
         AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
         clientStack.push(asyncClient);
@@ -205,7 +206,7 @@ public class AsyncClientPool {
         logger.error("Cannot create a new client for {}", node, e);
         nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
index 758296f..2c279c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
@@ -80,7 +80,7 @@ public class SyncClientPool {
 
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (clientStack.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
@@ -113,7 +113,7 @@ public class SyncClientPool {
     long waitStart = System.currentTimeMillis();
     while (clientStack.isEmpty()) {
       try {
-        this.wait(waitClientTimeoutMS);
+        clientStack.wait(waitClientTimeoutMS);
         if (clientStack.isEmpty()
             && System.currentTimeMillis() - waitStart >= waitClientTimeoutMS) {
           logger.warn(
@@ -144,7 +144,7 @@ public class SyncClientPool {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
     Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-    synchronized (this) {
+    synchronized (clientStack) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
         clientStack.push(client);
         NodeStatusManager.getINSTANCE().activate(node);
@@ -158,7 +158,7 @@ public class SyncClientPool {
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
-      this.notifyAll();
+      clientStack.notifyAll();
     }
   }
 
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index daa1aa9..c477d5f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -1003,6 +1003,11 @@ public class MetaGroupMember extends RaftMember {
       consistentNum.set(1);
       inconsistentNum.set(0);
       checkSeedNodesStatusOnce(consistentNum, inconsistentNum);
+      logger.debug(
+          "Status check result: {}-{}/{}",
+          consistentNum.get(),
+          inconsistentNum.get(),
+          getAllNodes().size());
       canEstablishCluster =
           analyseStartUpCheckResult(
               consistentNum.get(), inconsistentNum.get(), getAllNodes().size());
@@ -1032,7 +1037,14 @@ public class MetaGroupMember extends RaftMember {
       }
       pool.submit(
           () -> {
-            CheckStatusResponse response = checkStatus(seedNode);
+            logger.debug("Checking status with {}", seedNode);
+            CheckStatusResponse response = null;
+            try {
+              response = checkStatus(seedNode);
+            } catch (Exception e) {
+              logger.warn("Exception during status check", e);
+            }
+            logger.debug("CheckStatusResponse from {}: {}", seedNode, response);
             if (response != null) {
               // check the response
               ClusterUtils.examineCheckStatusResponse(