You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/24 12:55:33 UTC

[incubator-iotdb] branch cluster_read updated: reimple raft node as client manager

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_read
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_read by this push:
     new 49d05d0  reimple raft node as client manager
49d05d0 is described below

commit 49d05d0827f6b68c66609e15fade11d21cef1b36
Author: lta <li...@163.com>
AuthorDate: Wed Apr 24 20:55:17 2019 +0800

    reimple raft node as client manager
---
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 46 ++++++++++++----------
 1 file changed, 26 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index e76aec9..b7ee3b6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -26,16 +26,18 @@ import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
 import java.util.LinkedList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.task.QPTask.TaskState;
+import org.apache.iotdb.cluster.qp.task.QueryTask;
 import org.apache.iotdb.cluster.qp.task.SingleQPTask;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
-import org.apache.iotdb.cluster.qp.task.QueryTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,7 +83,12 @@ public class RaftNodeAsClientManager {
   /**
    * Lock to update clientNumInUse
    */
-  private ReentrantLock resourceLock = new ReentrantLock();
+  private Lock resourceLock = new ReentrantLock();
+
+  /**
+   * Condition to get client
+   */
+  private Condition resourceCondition = resourceLock.newCondition();
 
   /**
    * Mark whether system is shutting down
@@ -105,8 +112,8 @@ public class RaftNodeAsClientManager {
    * Try to get clientList, return null if num of queue clientList exceeds threshold.
    */
   public RaftNodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
+    resourceLock.lock();
     try {
-      resourceLock.lock();
       if (queueClientNum >= MAX_QUEUE_CLIENT_NUM) {
         throw new RaftConnectionException(String
             .format("Raft inner rpc clients have reached the max numbers %s",
@@ -118,7 +125,6 @@ public class RaftNodeAsClientManager {
         clientNumInUse.incrementAndGet();
         return getClient();
       }
-      queueClientNum++;
     } finally {
       resourceLock.unlock();
     }
@@ -136,29 +142,26 @@ public class RaftNodeAsClientManager {
    * Check whether it can get the clientList
    */
   private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
-    for (; ; ) {
-      if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
-        resourceLock.lock();
-        try {
+    resourceLock.lock();
+    queueClientNum++;
+    try {
+      while (true) {
+        if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
           checkShuttingDown();
           if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
             clientNumInUse.incrementAndGet();
-            queueClientNum--;
             return getClient();
           }
-        } catch (RaftConnectionException e) {
-          queueClientNum--;
-          throw new RaftConnectionException(e);
-        } finally {
-          resourceLock.unlock();
         }
+        resourceCondition.await();
       }
-      try {
-        Thread.sleep(THREAD_SLEEP_INTERVAL);
-      } catch (InterruptedException e) {
-        throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
-      }
+    } catch (InterruptedException e) {
+      throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+    } finally {
+      queueClientNum--;
+      resourceLock.unlock();
     }
+
   }
 
   /**
@@ -179,6 +182,9 @@ public class RaftNodeAsClientManager {
     resourceLock.lock();
     try {
       clientNumInUse.decrementAndGet();
+      if (clientList.isEmpty() && queueClientNum > 0) {
+        resourceCondition.signal();
+      }
       clientList.addLast(client);
     } finally {
       resourceLock.unlock();