You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by do...@apache.org on 2019/04/19 05:50:47 UTC
[incubator-iotdb] branch cluster updated: Manage RaftNodeAsClient
in cluster (#155)
This is an automated email from the ASF dual-hosted git repository.
dope pushed a commit to branch cluster
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster by this push:
new 7cecbcb Manage RaftNodeAsClient in cluster (#155)
7cecbcb is described below
commit 7cecbcb9e88fd7d8e0316f6acefdedb65eea2b05
Author: Tianan Li <li...@163.com>
AuthorDate: Fri Apr 19 13:50:42 2019 +0800
Manage RaftNodeAsClient in cluster (#155)
---
.../org/apache/iotdb/cluster/entity/Server.java | 3 +-
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 9 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 165 +++++++++++++++------
3 files changed, 120 insertions(+), 57 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a4c02cd..4b2ebef 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -88,6 +88,7 @@ public class Server {
/** Stand-alone version of IoTDB, be careful to replace the internal JDBC Server with a cluster version **/
iotdb = new IoTDB();
iotdb.active();
+ CLIENT_MANAGER.init();
/** Init raft groups **/
PeerId[] peerIds = RaftUtils.convertStringArrayToPeerIdArray(CLUSTER_CONF.getNodes());
@@ -127,7 +128,7 @@ public class Server {
}
- public void stop() throws ProcessorException {
+ public void stop() throws ProcessorException, InterruptedException {
QPTaskManager.getInstance().close(true, ClusterConstant.CLOSE_QP_SUB_TASK_BLOCK_TIMEOUT);
iotdb.deactivate();
CLIENT_MANAGER.shutdown();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 31be86c..b9debe3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -200,14 +200,7 @@ public abstract class ClusterQPExecutor {
* try to get raft rpc client
*/
private NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
- NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
- if (client == null) {
- throw new RaftConnectionException(String
- .format("Raft inner rpc clients have reached the max numbers %s",
- CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
- .getMaxQueueNumOfInnerRpcClient()));
- }
- return client;
+ return CLIENT_MANAGER.getRaftNodeAsClient();
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 178d0d4..fc5df44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -23,14 +23,16 @@ import com.alipay.remoting.exception.RemotingException;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import java.util.LinkedList;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.qp.callback.QPTask;
-import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.callback.QPTask;
+import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
@@ -62,16 +64,14 @@ public class RaftNodeAsClientManager {
private static final int MAX_QUEUE_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
/**
- * RaftNodeAsClient singleton
+ * RaftNodeAsClient list
*/
- private final RaftNodeAsClient client = new RaftNodeAsClient();
-
- private boolean clientInited = false;
+ private final LinkedList<RaftNodeAsClient> clientList = new LinkedList<>();
/**
* Number of clients in use
*/
- private AtomicInteger validClientNum = new AtomicInteger(0);
+ private AtomicInteger clientNumInUse = new AtomicInteger(0);
/**
* Number of requests for clients in queue
@@ -79,64 +79,135 @@ public class RaftNodeAsClientManager {
private int queueClientNum = 0;
/**
- * Lock to update validClientNum
+ * Lock to update clientNumInUse
+ */
+ private ReentrantLock resourceLock = new ReentrantLock();
+
+ /**
+ * Mark whether system is shutting down
+ */
+ private volatile boolean isShuttingDown;
+
+ /**
+ * Interval of thread sleep, unit is millisecond.
*/
- private ReentrantLock numLock = new ReentrantLock();
+ private static final int THREAD_SLEEP_INTERVAL = 10;
- private RaftNodeAsClientManager(){
+ private RaftNodeAsClientManager() {
}
+ public void init() {
+ isShuttingDown = false;
+ }
+
/**
- * Try to get client, return null if num of queue client exceeds threshold.
+ * Try to get clientList, return null if num of queue clientList exceeds threshold.
*/
- public RaftNodeAsClient getRaftNodeAsClient() {
+ public RaftNodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
try {
- if (!clientInited) {
- client.init();
- }
-
- numLock.lock();
- if (validClientNum.get() < MAX_VALID_CLIENT_NUM) {
- validClientNum.incrementAndGet();
- return client;
- }
+ resourceLock.lock();
if (queueClientNum >= MAX_QUEUE_CLIENT_NUM) {
- return null;
+ throw new RaftConnectionException(String
+ .format("Raft inner rpc clients have reached the max numbers %s",
+ CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
+ .getMaxQueueNumOfInnerRpcClient()));
+ }
+ checkShuttingDown();
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+ clientNumInUse.incrementAndGet();
+ return getClient();
}
queueClientNum++;
- }finally {
- numLock.unlock();
+ } finally {
+ resourceLock.unlock();
}
return tryToGetClient();
}
+ private void checkShuttingDown() throws RaftConnectionException {
+ if (isShuttingDown) {
+ throw new RaftConnectionException(
+ "Reject to provide RaftNodeAsClient client because cluster system is shutting down");
+ }
+ }
+
/**
- * Check whether it can get the client
+ * Check whether it can get the clientList
*/
- private RaftNodeAsClient tryToGetClient() {
- for(;;){
- if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
- try{
- numLock.lock();
- if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
- validClientNum.incrementAndGet();
+ private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
+ for (; ; ) {
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+ resourceLock.lock();
+ try {
+ checkShuttingDown();
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+ clientNumInUse.incrementAndGet();
queueClientNum--;
- return client;
+ return getClient();
}
- }finally {
- numLock.unlock();
+ } catch (RaftConnectionException e) {
+ queueClientNum--;
+ throw new RaftConnectionException(e);
+ } finally {
+ resourceLock.unlock();
}
}
+ try {
+ Thread.sleep(THREAD_SLEEP_INTERVAL);
+ } catch (InterruptedException e) {
+ throw new RaftConnectionException("An error occurred when trying to get NodeAsClient", e);
+ }
+ }
+ }
+
+ /**
+ * No-safe method, get client
+ */
+ private RaftNodeAsClient getClient() {
+ if (clientList.isEmpty()) {
+ return new RaftNodeAsClient();
+ } else {
+ return clientList.removeFirst();
+ }
+ }
+
+ /**
+ * Release usage of a client
+ */
+ public void releaseClient(RaftNodeAsClient client) {
+ resourceLock.lock();
+ try {
+ clientNumInUse.decrementAndGet();
+ clientList.addLast(client);
+ } finally {
+ resourceLock.unlock();
+ }
+ }
+
+ public void shutdown() throws InterruptedException {
+ isShuttingDown = true;
+ while (clientNumInUse.get() != 0 && queueClientNum != 0) {
+ // wait until releasing all usage of clients.
+ Thread.sleep(THREAD_SLEEP_INTERVAL);
+ }
+ while (!clientList.isEmpty()) {
+ clientList.removeFirst().shutdown();
}
}
- public void releaseClient() {
- validClientNum.decrementAndGet();
+ /**
+ * Get client number in use
+ */
+ public int getClientNumInUse() {
+ return clientNumInUse.get();
}
- public void shutdown(){
- client.shutdown();
+ /**
+ * Get client number in queue
+ */
+ public int getClientNumInQueue() {
+ return queueClientNum;
}
public static final RaftNodeAsClientManager getInstance() {
@@ -164,14 +235,13 @@ public class RaftNodeAsClientManager {
*/
private BoltCliClientService boltClientService;
- private RaftNodeAsClient(){
+ private RaftNodeAsClient() {
init();
}
- private void init(){
+ private void init() {
boltClientService = new BoltCliClientService();
boltClientService.init(new CliOptions());
- clientInited = true;
}
@Override
@@ -187,7 +257,7 @@ public class RaftNodeAsClientManager {
@Override
public void onResponse(Object result) {
BasicResponse response = (BasicResponse) result;
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(response);
}
@@ -195,7 +265,7 @@ public class RaftNodeAsClientManager {
public void onException(Throwable e) {
LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
qpTask.setTaskState(TaskState.EXCEPTION);
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(null);
}
@@ -207,7 +277,7 @@ public class RaftNodeAsClientManager {
} catch (RemotingException | InterruptedException e) {
LOGGER.error(e.getMessage());
qpTask.setTaskState(TaskState.EXCEPTION);
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(null);
throw new RaftConnectionException(e);
}
@@ -226,17 +296,16 @@ public class RaftNodeAsClientManager {
qpTask.run(null);
throw new RaftConnectionException(e);
} finally {
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
}
}
/**
- * Shut down client
+ * Shut down clientList
*/
@Override
public void shutdown() {
boltClientService.shutdown();
- clientInited = false;
}
}