You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/24 12:55:33 UTC
[incubator-iotdb] branch cluster_read updated: reimple raft node as
client manager
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_read by this push:
new 49d05d0 reimple raft node as client manager
49d05d0 is described below
commit 49d05d0827f6b68c66609e15fade11d21cef1b36
Author: lta <li...@163.com>
AuthorDate: Wed Apr 24 20:55:17 2019 +0800
reimple raft node as client manager
---
.../rpc/raft/impl/RaftNodeAsClientManager.java | 46 ++++++++++++----------
1 file changed, 26 insertions(+), 20 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index e76aec9..b7ee3b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -26,16 +26,18 @@ import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
import java.util.LinkedList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.qp.task.QueryTask;
import org.apache.iotdb.cluster.qp.task.SingleQPTask;
import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +83,12 @@ public class RaftNodeAsClientManager {
/**
* Lock to update clientNumInUse
*/
- private ReentrantLock resourceLock = new ReentrantLock();
+ private Lock resourceLock = new ReentrantLock();
+
+ /**
+ * Condition to get client
+ */
+ private Condition resourceCondition = resourceLock.newCondition();
/**
* Mark whether system is shutting down
@@ -105,8 +112,8 @@ public class RaftNodeAsClientManager {
* Try to get clientList, return null if num of queue clientList exceeds threshold.
*/
public RaftNodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
+ resourceLock.lock();
try {
- resourceLock.lock();
if (queueClientNum >= MAX_QUEUE_CLIENT_NUM) {
throw new RaftConnectionException(String
.format("Raft inner rpc clients have reached the max numbers %s",
@@ -118,7 +125,6 @@ public class RaftNodeAsClientManager {
clientNumInUse.incrementAndGet();
return getClient();
}
- queueClientNum++;
} finally {
resourceLock.unlock();
}
@@ -136,29 +142,26 @@ public class RaftNodeAsClientManager {
* Check whether it can get the clientList
*/
private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
- for (; ; ) {
- if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
- resourceLock.lock();
- try {
+ resourceLock.lock();
+ queueClientNum++;
+ try {
+ while (true) {
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
checkShuttingDown();
if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
clientNumInUse.incrementAndGet();
- queueClientNum--;
return getClient();
}
- } catch (RaftConnectionException e) {
- queueClientNum--;
- throw new RaftConnectionException(e);
- } finally {
- resourceLock.unlock();
}
+ resourceCondition.await();
}
- try {
- Thread.sleep(THREAD_SLEEP_INTERVAL);
- } catch (InterruptedException e) {
- throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
- }
+ } catch (InterruptedException e) {
+ throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+ } finally {
+ queueClientNum--;
+ resourceLock.unlock();
}
+
}
/**
@@ -179,6 +182,9 @@ public class RaftNodeAsClientManager {
resourceLock.lock();
try {
clientNumInUse.decrementAndGet();
+ if (clientList.isEmpty() && queueClientNum > 0) {
+ resourceCondition.signal();
+ }
clientList.addLast(client);
} finally {
resourceLock.unlock();