You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/18 16:44:22 UTC
[incubator-iotdb] branch cluster_manage_client updated: fix some
issues
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_manage_client
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_manage_client by this push:
new b4971bc fix some issues
b4971bc is described below
commit b4971bc44c3ea611dd0da9a142e76cf3b7ad3ed9
Author: lta <li...@163.com>
AuthorDate: Fri Apr 19 00:43:58 2019 +0800
fix some issues
---
.../org/apache/iotdb/cluster/entity/Server.java | 2 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 49 +++++++++++++---------
2 files changed, 31 insertions(+), 20 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index ed36c2f..4b2ebef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -128,7 +128,7 @@ public class Server {
}
- public void stop() throws ProcessorException {
+ public void stop() throws ProcessorException, InterruptedException {
QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
iotdb.deactivate();
CLIENT_MANAGER.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index efd427b..fc5df44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -86,14 +86,19 @@ public class RaftNodeAsClientManager {
/**
* Mark whether system is shutting down
*/
- private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+ private volatile boolean isShuttingDown;
- private RaftNodeAsClientManager(){
+ /**
+ * Interval of thread sleep, unit is millisecond.
+ */
+ private static final int THREAD_SLEEP_INTERVAL = 10;
+
+ private RaftNodeAsClientManager() {
}
public void init() {
- isShuttingDown.set(false);
+ isShuttingDown = false;
}
/**
@@ -114,14 +119,14 @@ public class RaftNodeAsClientManager {
return getClient();
}
queueClientNum++;
- }finally {
+ } finally {
resourceLock.unlock();
}
return tryToGetClient();
}
private void checkShuttingDown() throws RaftConnectionException {
- if (isShuttingDown.get()) {
+ if (isShuttingDown) {
throw new RaftConnectionException(
"Reject to provide RaftNodeAsClient client because cluster system is shutting down");
}
@@ -131,12 +136,12 @@ public class RaftNodeAsClientManager {
* Check whether it can get the clientList
*/
private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
- for(;;){
- if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+ for (; ; ) {
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
resourceLock.lock();
- try{
+ try {
checkShuttingDown();
- if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
clientNumInUse.incrementAndGet();
queueClientNum--;
return getClient();
@@ -148,6 +153,11 @@ public class RaftNodeAsClientManager {
resourceLock.unlock();
}
}
+ try {
+ Thread.sleep(THREAD_SLEEP_INTERVAL);
+ } catch (InterruptedException e) {
+ throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+ }
}
}
@@ -175,12 +185,13 @@ public class RaftNodeAsClientManager {
}
}
- public void shutdown(){
- isShuttingDown.set(true);
- while (clientNumInUse.get() != 0 && queueClientNum != 0){
+ public void shutdown() throws InterruptedException {
+ isShuttingDown = true;
+ while (clientNumInUse.get() != 0 && queueClientNum != 0) {
// wait until releasing all usage of clients.
+ Thread.sleep(THREAD_SLEEP_INTERVAL);
}
- while(!clientList.isEmpty()){
+ while (!clientList.isEmpty()) {
clientList.removeFirst().shutdown();
}
}
@@ -224,11 +235,11 @@ public class RaftNodeAsClientManager {
*/
private BoltCliClientService boltClientService;
- private RaftNodeAsClient(){
+ private RaftNodeAsClient() {
init();
}
- private void init(){
+ private void init() {
boltClientService = new BoltCliClientService();
boltClientService.init(new CliOptions());
}
@@ -237,7 +248,7 @@ public class RaftNodeAsClientManager {
public void asyncHandleRequest(BasicRequest request, PeerId leader,
QPTask qpTask)
throws RaftConnectionException {
- LOGGER.debug("Node as clientList to send request to leader: {}", leader);
+ LOGGER.debug("Node as client to send request to leader: {}", leader);
try {
boltClientService.getRpcClient()
.invokeWithCallback(leader.getEndpoint().toString(), request,
@@ -252,7 +263,7 @@ public class RaftNodeAsClientManager {
@Override
public void onException(Throwable e) {
- LOGGER.error("Bolt rpc clientList occurs errors when handling Request", e);
+ LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
qpTask.setTaskState(TaskState.EXCEPTION);
releaseClient(RaftNodeAsClient.this);
qpTask.run(null);
@@ -266,7 +277,7 @@ public class RaftNodeAsClientManager {
} catch (RemotingException | InterruptedException e) {
LOGGER.error(e.getMessage());
qpTask.setTaskState(TaskState.EXCEPTION);
- releaseClient(this);
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(null);
throw new RaftConnectionException(e);
}
@@ -285,7 +296,7 @@ public class RaftNodeAsClientManager {
qpTask.run(null);
throw new RaftConnectionException(e);
} finally {
- releaseClient(this);
+ releaseClient(RaftNodeAsClient.this);
}
}