You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/19 05:50:47 UTC

[incubator-iotdb] branch cluster updated: Manage RaftNodeAsClient in cluster (#155)

This is an automated email from the ASF dual-hosted git repository.

dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster by this push:
     new 7cecbcb  Manage RaftNodeAsClient in cluster  (#155)
7cecbcb is described below

commit 7cecbcb9e88fd7d8e0316f6acefdedb65eea2b05
Author: Tianan Li <li...@163.com>
AuthorDate: Fri Apr 19 13:50:42 2019 +0800

    Manage RaftNodeAsClient in cluster  (#155)
---
 .../org/apache/iotdb/cluster/entity/Server.java    |   3 +-
 .../apache/iotdb/cluster/qp/ClusterQPExecutor.java |   9 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 165 +++++++++++++++------
 3 files changed, 120 insertions(+), 57 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a4c02cd..4b2ebef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -88,6 +88,7 @@ public class Server {
     /** Stand-alone version of IoTDB, be careful to replace the internal JDBC Server with a cluster version **/
     iotdb = new IoTDB();
     iotdb.active();
+    CLIENT_MANAGER.init();
 
     /** Init raft groups **/
     PeerId[] peerIds = RaftUtils.convertStringArrayToPeerIdArray(CLUSTER_CONF.getNodes());
@@ -127,7 +128,7 @@ public class Server {
 
   }
 
-  public void stop() throws ProcessorException {
+  public void stop() throws ProcessorException, InterruptedException {
     QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
     iotdb.deactivate();
     CLIENT_MANAGER.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 31be86c..b9debe3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -200,14 +200,7 @@ public abstract class ClusterQPExecutor {
    * try to get raft rpc client
    */
   private NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
-    NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
-    if (client == null) {
-      throw new RaftConnectionException(String
-          .format("Raft inner rpc clients have reached the max numbers %s",
-              CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
-                  .getMaxQueueNumOfInnerRpcClient()));
-    }
-    return client;
+    return CLIENT_MANAGER.getRaftNodeAsClient();
   }
 
   /**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 178d0d4..fc5df44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -23,14 +23,16 @@ import com.alipay.remoting.exception.RemotingException;
 import com.alipay.sofa.jraft.entity.PeerId;
 import com.alipay.sofa.jraft.option.CliOptions;
 import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import java.util.LinkedList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.qp.callback.QPTask;
-import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
 import org.apache.iotdb.cluster.config.ClusterConfig;
 import org.apache.iotdb.cluster.config.ClusterDescriptor;
 import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.callback.QPTask;
+import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
 import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
 import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
 import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
@@ -62,16 +64,14 @@ public class RaftNodeAsClientManager {
   private static final int MAX_QUEUE_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
 
   /**
-   * RaftNodeAsClient singleton
+   * RaftNodeAsClient list
    */
-  private final RaftNodeAsClient client = new RaftNodeAsClient();
-
-  private boolean clientInited = false;
+  private final LinkedList<RaftNodeAsClient> clientList = new LinkedList<>();
 
   /**
    * Number of clients in use
    */
-  private AtomicInteger validClientNum = new AtomicInteger(0);
+  private AtomicInteger clientNumInUse = new AtomicInteger(0);
 
   /**
    * Number of requests for clients in queue
@@ -79,64 +79,135 @@ public class RaftNodeAsClientManager {
   private int queueClientNum = 0;
 
   /**
-   * Lock to update validClientNum
+   * Lock to update clientNumInUse
+   */
+  private ReentrantLock resourceLock = new ReentrantLock();
+
+  /**
+   * Mark whether system is shutting down
+   */
+  private volatile boolean isShuttingDown;
+
+  /**
+   * Interval of thread sleep, unit is millisecond.
    */
-  private ReentrantLock numLock = new ReentrantLock();
+  private static final int THREAD_SLEEP_INTERVAL = 10;
 
-  private RaftNodeAsClientManager(){
+  private RaftNodeAsClientManager() {
 
   }
 
+  public void init() {
+    isShuttingDown = false;
+  }
+
   /**
-   * Try to get client, return null if num of queue client exceeds threshold.
+   * Try to get clientList, return null if num of queue clientList exceeds threshold.
    */
-  public RaftNodeAsClient getRaftNodeAsClient() {
+  public RaftNodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
     try {
-      if (!clientInited) {
-        client.init();
-      }
-
-      numLock.lock();
-      if (validClientNum.get() < MAX_VALID_CLIENT_NUM) {
-        validClientNum.incrementAndGet();
-        return client;
-      }
+      resourceLock.lock();
       if (queueClientNum >= MAX_QUEUE_CLIENT_NUM) {
-        return null;
+        throw new RaftConnectionException(String
+            .format("Raft inner rpc clients have reached the max numbers %s",
+                CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
+                    .getMaxQueueNumOfInnerRpcClient()));
+      }
+      checkShuttingDown();
+      if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+        clientNumInUse.incrementAndGet();
+        return getClient();
       }
       queueClientNum++;
-    }finally {
-      numLock.unlock();
+    } finally {
+      resourceLock.unlock();
     }
     return tryToGetClient();
   }
 
+  private void checkShuttingDown() throws RaftConnectionException {
+    if (isShuttingDown) {
+      throw new RaftConnectionException(
+          "Reject to provide RaftNodeAsClient client because cluster system is shutting down");
+    }
+  }
+
   /**
-   * Check whether it can get the client
+   * Check whether it can get the clientList
    */
-  private RaftNodeAsClient tryToGetClient() {
-    for(;;){
-      if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
-        try{
-          numLock.lock();
-          if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
-            validClientNum.incrementAndGet();
+  private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
+    for (; ; ) {
+      if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+        resourceLock.lock();
+        try {
+          checkShuttingDown();
+          if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+            clientNumInUse.incrementAndGet();
             queueClientNum--;
-            return client;
+            return getClient();
           }
-        }finally {
-          numLock.unlock();
+        } catch (RaftConnectionException e) {
+          queueClientNum--;
+          throw new RaftConnectionException(e);
+        } finally {
+          resourceLock.unlock();
         }
       }
+      try {
+        Thread.sleep(THREAD_SLEEP_INTERVAL);
+      } catch (InterruptedException e) {
+        throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+      }
+    }
+  }
+
+  /**
+   * No-safe method, get client
+   */
+  private RaftNodeAsClient getClient() {
+    if (clientList.isEmpty()) {
+      return new RaftNodeAsClient();
+    } else {
+      return clientList.removeFirst();
+    }
+  }
+
+  /**
+   * Release usage of a client
+   */
+  public void releaseClient(RaftNodeAsClient client) {
+    resourceLock.lock();
+    try {
+      clientNumInUse.decrementAndGet();
+      clientList.addLast(client);
+    } finally {
+      resourceLock.unlock();
+    }
+  }
+
+  public void shutdown() throws InterruptedException {
+    isShuttingDown = true;
+    while (clientNumInUse.get() != 0 && queueClientNum != 0) {
+      // wait until releasing all usage of clients.
+      Thread.sleep(THREAD_SLEEP_INTERVAL);
+    }
+    while (!clientList.isEmpty()) {
+      clientList.removeFirst().shutdown();
     }
   }
 
-  public void releaseClient() {
-    validClientNum.decrementAndGet();
+  /**
+   * Get client number in use
+   */
+  public int getClientNumInUse() {
+    return clientNumInUse.get();
   }
 
-  public void shutdown(){
-    client.shutdown();
+  /**
+   * Get client number in queue
+   */
+  public int getClientNumInQueue() {
+    return queueClientNum;
   }
 
   public static final RaftNodeAsClientManager getInstance() {
@@ -164,14 +235,13 @@ public class RaftNodeAsClientManager {
      */
     private BoltCliClientService boltClientService;
 
-    private RaftNodeAsClient(){
+    private RaftNodeAsClient() {
       init();
     }
 
-    private void init(){
+    private void init() {
       boltClientService = new BoltCliClientService();
       boltClientService.init(new CliOptions());
-      clientInited = true;
     }
 
     @Override
@@ -187,7 +257,7 @@ public class RaftNodeAsClientManager {
                   @Override
                   public void onResponse(Object result) {
                     BasicResponse response = (BasicResponse) result;
-                    releaseClient();
+                    releaseClient(RaftNodeAsClient.this);
                     qpTask.run(response);
                   }
 
@@ -195,7 +265,7 @@ public class RaftNodeAsClientManager {
                   public void onException(Throwable e) {
                     LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
                     qpTask.setTaskState(TaskState.EXCEPTION);
-                    releaseClient();
+                    releaseClient(RaftNodeAsClient.this);
                     qpTask.run(null);
                   }
 
@@ -207,7 +277,7 @@ public class RaftNodeAsClientManager {
       } catch (RemotingException | InterruptedException e) {
         LOGGER.error(e.getMessage());
         qpTask.setTaskState(TaskState.EXCEPTION);
-        releaseClient();
+        releaseClient(RaftNodeAsClient.this);
         qpTask.run(null);
         throw new RaftConnectionException(e);
       }
@@ -226,17 +296,16 @@ public class RaftNodeAsClientManager {
         qpTask.run(null);
         throw new RaftConnectionException(e);
       } finally {
-        releaseClient();
+        releaseClient(RaftNodeAsClient.this);
       }
     }
 
     /**
-     * Shut down client
+     * Shut down clientList
      */
     @Override
     public void shutdown() {
       boltClientService.shutdown();
-      clientInited = false;
     }
 
   }