You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/07/13 10:51:58 UTC
[incubator-iotdb] branch cluster_new updated: fix bugs
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_new
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_new by this push:
new efe1d98 fix bugs
new 86be0a5 Merge pull request #1486 from LebronAl/cluster_new_forget_update_clientNumMap
efe1d98 is described below
commit efe1d9818b8aa31a0cc6e8b42b63b29220f85c03
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jul 13 13:12:24 2020 +0800
fix bugs
---
.../iotdb/cluster/client/async/AsyncClientPool.java | 11 ++++++++++-
.../iotdb/cluster/client/async/AsyncDataClient.java | 15 ++++++++++++---
.../iotdb/cluster/client/async/AsyncMetaClient.java | 13 +++++++++++++
3 files changed, 35 insertions(+), 4 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
index 2fa03d7..088e5e5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
@@ -47,6 +47,7 @@ public class AsyncClientPool {
/**
* Get a client of the given node from the cache if one is available, or create a new one.
+ *
* @param node
* @return
* @throws IOException
@@ -77,7 +78,8 @@ public class AsyncClientPool {
while (clientStack.isEmpty()) {
try {
this.wait(WAIT_CLIENT_TIMEOUT_MS);
- if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) {
+ if (clientStack.isEmpty()
+ && System.currentTimeMillis() - waitStart >= WAIT_CLIENT_TIMEOUT_MS) {
logger.warn("Cannot get an available client after {}ms, create a new one",
WAIT_CLIENT_TIMEOUT_MS);
nodeClientNumMap.put(node, nodeClientNum + 1);
@@ -94,6 +96,7 @@ public class AsyncClientPool {
/**
* Return a client of a node to the pool. Closed client should not be returned.
+ *
* @param node
* @param client
*/
@@ -105,4 +108,10 @@ public class AsyncClientPool {
this.notifyAll();
}
}
+
+ public void removeClientForNodeClientNumMap(Node node) {
+ synchronized (this) {
+ nodeClientNumMap.computeIfPresent(node, (k, v) -> v - 1);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index 5f06230..9925ff1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -55,11 +55,16 @@ public class AsyncDataClient extends AsyncClient {
@Override
public void onComplete() {
super.onComplete();
- pool.putClient(node, this);
+ // return itself to the pool if the job is done
+ if (pool != null) {
+ pool.putClient(node, this);
+ }
}
- public boolean isReady() {
- return ___currentMethod == null;
+ @Override
+ public void onError(Exception e){
+ super.onError(e);
+ pool.removeClientForNodeClientNumMap(node);
}
public static class FactoryAsync implements AsyncClientFactory {
@@ -102,4 +107,8 @@ public class AsyncDataClient extends AsyncClient {
public Node getNode() {
return node;
}
+
+ public boolean isReady() {
+ return ___currentMethod == null;
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index 6e458f9..f2d781a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -60,6 +60,12 @@ public class AsyncMetaClient extends AsyncClient {
}
}
+ @Override
+ public void onError(Exception e){
+ super.onError(e);
+ pool.removeClientForNodeClientNumMap(node);
+ }
+
public static class FactoryAsync implements AsyncClientFactory {
private static TAsyncClientManager[] managers;
@@ -92,6 +98,13 @@ public class AsyncMetaClient extends AsyncClient {
}
}
+ @Override
+ public String toString() {
+ return "MetaClient{" +
+ "node=" + node +
+ '}';
+ }
+
public Node getNode() {
return node;
}