You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2021/02/25 11:38:28 UTC

[GitHub] [iotdb] LebronAl commented on a change in pull request #2662: Fix statistics value in SyncClientPool/AsyncClientPool have concurren…

LebronAl commented on a change in pull request #2662:
URL: https://github.com/apache/iotdb/pull/2662#discussion_r582744864



##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
##########
@@ -81,19 +82,19 @@ public AsyncClient getClient(Node node, boolean activatedOnly) throws IOExceptio
 
     AsyncClient client;
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
-    Deque<AsyncClient> clientStack =
+    Deque<AsyncClient> clientDeque =
         clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
     synchronized (this) {
-      if (clientStack.isEmpty()) {
+      if (clientDeque.isEmpty()) {
         int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
         if (nodeClientNum >= maxConnectionForEachNode) {
-          client = waitForClient(clientStack, clusterNode, nodeClientNum);
+          client = waitForClient(clientDeque, clusterNode);
         } else {
           nodeClientNumMap.put(clusterNode, nodeClientNum + 1);
           client = asyncClientFactory.getAsyncClient(clusterNode, this);
         }
       } else {
-        client = clientStack.pop();
+        client = clientDeque.getFirst();

Review comment:
       why not use `pop()`? this client must be removed from this deque temporarily as it will be used in the caller

##########
File path: cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
##########
@@ -125,25 +137,25 @@ private Client waitForClient(Deque<Client> clientStack, ClusterNode node, int no
    * @param node
    * @param client
    */
-  void putClient(Node node, Client client) {
+  public void putClient(Node node, Client client) {
     ClusterNode clusterNode = new ClusterNode(node);
     // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
-    Deque<Client> clientStack = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
+    Deque<Client> clientDeque = clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
     synchronized (this) {
       if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
-        clientStack.push(client);
+        clientDeque.push(client);
         NodeStatusManager.getINSTANCE().activate(node);
       } else {
         try {
-          clientStack.push(syncClientFactory.getSyncClient(node, this));
+          clientDeque.push(syncClientFactory.getSyncClient(node, this));
           NodeStatusManager.getINSTANCE().activate(node);
+          this.notify();
         } catch (TTransportException e) {
           logger.error("Cannot open transport for client {}", node, e);
           nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1);
           NodeStatusManager.getINSTANCE().deactivate(node);
         }
       }
-      this.notifyAll();

Review comment:
       I can understand why you change `notifyAll` to `notify`, but I don't know why remove this line here to above?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org