You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/07/13 10:51:58 UTC

[incubator-iotdb] branch cluster_new updated: fix bugs

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_new by this push:
     new efe1d98  fix bugs
     new 86be0a5  Merge pull request #1486 from LebronAl/cluster_new_forget_update_clientNumMap
efe1d98 is described below

commit efe1d9818b8aa31a0cc6e8b42b63b29220f85c03
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jul 13 13:12:24 2020 +0800

    fix bugs
---
 .../iotdb/cluster/client/async/AsyncClientPool.java       | 11 ++++++++++-
 .../iotdb/cluster/client/async/AsyncDataClient.java       | 15 ++++++++++++---
 .../iotdb/cluster/client/async/AsyncMetaClient.java       | 13 +++++++++++++
 3 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 2fa03d7..088e5e5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -47,6 +47,7 @@ public class AsyncClientPool {
 
   /**
    * Get a client of the given node from the cache if one is available, or create a new one.
+   *
    * @param node
    * @return
    * @throws IOException
@@ -77,7 +78,8 @@ public class AsyncClientPool {
     while (clientStack.isEmpty()) {
       try {
         this.wait(WAIT_CLIENT_TIMEOUT_MS);
-        if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) {
+        if (clientStack.isEmpty()
+            && System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) {
           logger.warn("Cannot get an available client after {}ms, create a new one",
               WAIT_CLIENT_TIMEOUT_MS);
           nodeClientNumMap.put(node, nodeClientNum + 1);
@@ -94,6 +96,7 @@ public class AsyncClientPool {
 
   /**
    * Return a client of a node to the pool. Closed client should not be returned.
+   *
    * @param node
    * @param client
    */
@@ -105,4 +108,10 @@ public class AsyncClientPool {
       this.notifyAll();
     }
   }
+
+  public void removeClientForNodeClientNumMap(Node node) {
+    synchronized (this) {
+      nodeClientNumMap.computeIfPresent(node, (k, v) -> v - 1);
+    }
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index 5f06230..9925ff1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -55,11 +55,16 @@ public class AsyncDataClient extends AsyncClient {
   @Override
   public void onComplete() {
     super.onComplete();
-    pool.putClient(node, this);
+    // return itself to the pool if the job is done
+    if (pool != null) {
+      pool.putClient(node, this);
+    }
   }
 
-  public boolean isReady() {
-    return ___currentMethod == null;
+  @Override
+  public void onError(Exception e){
+    super.onError(e);
+    pool.removeClientForNodeClientNumMap(node);
   }
 
   public static class FactoryAsync implements AsyncClientFactory {
@@ -102,4 +107,8 @@ public class AsyncDataClient extends AsyncClient {
   public Node getNode() {
     return node;
   }
+
+  public boolean isReady() {
+    return ___currentMethod == null;
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index 6e458f9..f2d781a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -60,6 +60,12 @@ public class AsyncMetaClient extends AsyncClient {
     }
   }
 
+  @Override
+  public void onError(Exception e){
+    super.onError(e);
+    pool.removeClientForNodeClientNumMap(node);
+  }
+
   public static class FactoryAsync implements AsyncClientFactory {
 
     private static TAsyncClientManager[] managers;
@@ -92,6 +98,13 @@ public class AsyncMetaClient extends AsyncClient {
     }
   }
 
+  @Override
+  public String toString() {
+    return "MetaClient{" +
+        "node=" + node +
+        '}';
+  }
+
   public Node getNode() {
     return node;
   }