You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/18 16:44:22 UTC

[incubator-iotdb] branch cluster_manage_client updated: fix some issues

This is an automated email from the ASF dual-hosted git repository.

lta pushed a commit to branch cluster_manage_client
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/cluster_manage_client by this push:
     new b4971bc  fix some issues
b4971bc is described below

commit b4971bc44c3ea611dd0da9a142e76cf3b7ad3ed9
Author: lta <li...@163.com>
AuthorDate: Fri Apr 19 00:43:58 2019 +0800

    fix some issues
---
 .../org/apache/iotdb/cluster/entity/Server.java    |  2 +-
 .../rpc/raft/impl/RaftNodeAsClientManager.java     | 49 +++++++++++++---------
 2 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index ed36c2f..4b2ebef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -128,7 +128,7 @@ public class Server {
 
   }
 
-  public void stop() throws ProcessorException {
+  public void stop() throws ProcessorException, InterruptedException {
     QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
     iotdb.deactivate();
     CLIENT_MANAGER.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index efd427b..fc5df44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -86,14 +86,19 @@ public class RaftNodeAsClientManager {
   /**
    * Mark whether system is shutting down
    */
-  private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+  private volatile boolean isShuttingDown;
 
-  private RaftNodeAsClientManager(){
+  /**
+   * Interval of thread sleep, unit is millisecond.
+   */
+  private static final int THREAD_SLEEP_INTERVAL = 10;
+
+  private RaftNodeAsClientManager() {
 
   }
 
   public void init() {
-    isShuttingDown.set(false);
+    isShuttingDown = false;
   }
 
   /**
@@ -114,14 +119,14 @@ public class RaftNodeAsClientManager {
         return getClient();
       }
       queueClientNum++;
-    }finally {
+    } finally {
       resourceLock.unlock();
     }
     return tryToGetClient();
   }
 
   private void checkShuttingDown() throws RaftConnectionException {
-    if (isShuttingDown.get()) {
+    if (isShuttingDown) {
       throw new RaftConnectionException(
           "Reject to provide RaftNodeAsClient client because cluster system is shutting down");
     }
@@ -131,12 +136,12 @@ public class RaftNodeAsClientManager {
    * Check whether it can get the clientList
    */
   private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
-    for(;;){
-      if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+    for (; ; ) {
+      if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
         resourceLock.lock();
-        try{
+        try {
           checkShuttingDown();
-          if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+          if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
             clientNumInUse.incrementAndGet();
             queueClientNum--;
             return getClient();
@@ -148,6 +153,11 @@ public class RaftNodeAsClientManager {
           resourceLock.unlock();
         }
       }
+      try {
+        Thread.sleep(THREAD_SLEEP_INTERVAL);
+      } catch (InterruptedException e) {
+        throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+      }
     }
   }
 
@@ -175,12 +185,13 @@ public class RaftNodeAsClientManager {
     }
   }
 
-  public void shutdown(){
-    isShuttingDown.set(true);
-    while (clientNumInUse.get() != 0 && queueClientNum != 0){
+  public void shutdown() throws InterruptedException {
+    isShuttingDown = true;
+    while (clientNumInUse.get() != 0 && queueClientNum != 0) {
       // wait until releasing all usage of clients.
+      Thread.sleep(THREAD_SLEEP_INTERVAL);
     }
-    while(!clientList.isEmpty()){
+    while (!clientList.isEmpty()) {
       clientList.removeFirst().shutdown();
     }
   }
@@ -224,11 +235,11 @@ public class RaftNodeAsClientManager {
      */
     private BoltCliClientService boltClientService;
 
-    private RaftNodeAsClient(){
+    private RaftNodeAsClient() {
       init();
     }
 
-    private void init(){
+    private void init() {
       boltClientService = new BoltCliClientService();
       boltClientService.init(new CliOptions());
     }
@@ -237,7 +248,7 @@ public class RaftNodeAsClientManager {
     public void asyncHandleRequest(BasicRequest request, PeerId leader,
         QPTask qpTask)
         throws RaftConnectionException {
-      LOGGER.debug("Node as clientList to send request to leader: {}", leader);
+      LOGGER.debug("Node as client to send request to leader: {}", leader);
       try {
         boltClientService.getRpcClient()
             .invokeWithCallback(leader.getEndpoint().toString(), request,
@@ -252,7 +263,7 @@ public class RaftNodeAsClientManager {
 
                   @Override
                   public void onException(Throwable e) {
-                    LOGGER.error("Bolt rpc clientList occurs errors when handling Request", e);
+                    LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
                     qpTask.setTaskState(TaskState.EXCEPTION);
                     releaseClient(RaftNodeAsClient.this);
                     qpTask.run(null);
@@ -266,7 +277,7 @@ public class RaftNodeAsClientManager {
       } catch (RemotingException | InterruptedException e) {
         LOGGER.error(e.getMessage());
         qpTask.setTaskState(TaskState.EXCEPTION);
-        releaseClient(this);
+        releaseClient(RaftNodeAsClient.this);
         qpTask.run(null);
         throw new RaftConnectionException(e);
       }
@@ -285,7 +296,7 @@ public class RaftNodeAsClientManager {
         qpTask.run(null);
         throw new RaftConnectionException(e);
       } finally {
-        releaseClient(this);
+        releaseClient(RaftNodeAsClient.this);
       }
     }