You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by lt...@apache.org on 2019/04/18 13:27:28 UTC
[incubator-iotdb] branch cluster_manage_client updated: add init
This is an automated email from the ASF dual-hosted git repository.
lta pushed a commit to branch cluster_manage_client
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/cluster_manage_client by this push:
new e077467 add init
e077467 is described below
commit e0774675949011f6c0d45121aa228fa9ae2bfc98
Author: lta <li...@163.com>
AuthorDate: Thu Apr 18 21:27:14 2019 +0800
add init
---
.../org/apache/iotdb/cluster/entity/Server.java | 1 +
.../apache/iotdb/cluster/qp/ClusterQPExecutor.java | 9 +-
.../rpc/raft/impl/RaftNodeAsClientManager.java | 144 +++++++++++++++------
3 files changed, 103 insertions(+), 51 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
index a4c02cd..ed36c2f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/entity/Server.java
@@ -88,6 +88,7 @@ public class Server {
/** Stand-alone version of IoTDB, be careful to replace the internal JDBC Server with a cluster version **/
iotdb = new IoTDB();
iotdb.active();
+ CLIENT_MANAGER.init();
/** Init raft groups **/
PeerId[] peerIds = RaftUtils.convertStringArrayToPeerIdArray(CLUSTER_CONF.getNodes());
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
index 31be86c..b9debe3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/qp/ClusterQPExecutor.java
@@ -200,14 +200,7 @@ public abstract class ClusterQPExecutor {
* try to get raft rpc client
*/
private NodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
- NodeAsClient client = CLIENT_MANAGER.getRaftNodeAsClient();
- if (client == null) {
- throw new RaftConnectionException(String
- .format("Raft inner rpc clients have reached the max numbers %s",
- CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
- .getMaxQueueNumOfInnerRpcClient()));
- }
- return client;
+ return CLIENT_MANAGER.getRaftNodeAsClient();
}
/**
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
index 178d0d4..efd427b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/rpc/raft/impl/RaftNodeAsClientManager.java
@@ -23,14 +23,16 @@ import com.alipay.remoting.exception.RemotingException;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.CliOptions;
import com.alipay.sofa.jraft.rpc.impl.cli.BoltCliClientService;
+import java.util.LinkedList;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.iotdb.cluster.qp.callback.QPTask;
-import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.RaftConnectionException;
+import org.apache.iotdb.cluster.qp.callback.QPTask;
+import org.apache.iotdb.cluster.qp.callback.QPTask.TaskState;
import org.apache.iotdb.cluster.rpc.raft.NodeAsClient;
import org.apache.iotdb.cluster.rpc.raft.request.BasicRequest;
import org.apache.iotdb.cluster.rpc.raft.response.BasicResponse;
@@ -62,16 +64,14 @@ public class RaftNodeAsClientManager {
private static final int MAX_QUEUE_CLIENT_NUM = CLUSTER_CONFIG.getMaxNumOfInnerRpcClient();
/**
- * RaftNodeAsClient singleton
+ * RaftNodeAsClient list
*/
- private final RaftNodeAsClient client = new RaftNodeAsClient();
-
- private boolean clientInited = false;
+ private final LinkedList<RaftNodeAsClient> clientList = new LinkedList<>();
/**
* Number of clients in use
*/
- private AtomicInteger validClientNum = new AtomicInteger(0);
+ private AtomicInteger clientNumInUse = new AtomicInteger(0);
/**
* Number of requests for clients in queue
@@ -79,64 +79,124 @@ public class RaftNodeAsClientManager {
private int queueClientNum = 0;
/**
- * Lock to update validClientNum
+ * Lock to update clientNumInUse
*/
- private ReentrantLock numLock = new ReentrantLock();
+ private ReentrantLock resourceLock = new ReentrantLock();
+
+ /**
+ * Mark whether system is shutting down
+ */
+ private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private RaftNodeAsClientManager(){
}
+ public void init() {
+ isShuttingDown.set(false);
+ }
+
/**
- * Try to get client, return null if num of queue client exceeds threshold.
+ * Try to get clientList, return null if num of queue clientList exceeds threshold.
*/
- public RaftNodeAsClient getRaftNodeAsClient() {
+ public RaftNodeAsClient getRaftNodeAsClient() throws RaftConnectionException {
try {
- if (!clientInited) {
- client.init();
- }
-
- numLock.lock();
- if (validClientNum.get() < MAX_VALID_CLIENT_NUM) {
- validClientNum.incrementAndGet();
- return client;
- }
+ resourceLock.lock();
if (queueClientNum >= MAX_QUEUE_CLIENT_NUM) {
- return null;
+ throw new RaftConnectionException(String
+ .format("Raft inner rpc clients have reached the max numbers %s",
+ CLUSTER_CONFIG.getMaxNumOfInnerRpcClient() + CLUSTER_CONFIG
+ .getMaxQueueNumOfInnerRpcClient()));
+ }
+ checkShuttingDown();
+ if (clientNumInUse.get() < MAX_VALID_CLIENT_NUM) {
+ clientNumInUse.incrementAndGet();
+ return getClient();
}
queueClientNum++;
}finally {
- numLock.unlock();
+ resourceLock.unlock();
}
return tryToGetClient();
}
+ private void checkShuttingDown() throws RaftConnectionException {
+ if (isShuttingDown.get()) {
+ throw new RaftConnectionException(
+ "Reject to provide RaftNodeAsClient client because cluster system is shutting down");
+ }
+ }
+
/**
- * Check whether it can get the client
+ * Check whether it can get the clientList
*/
- private RaftNodeAsClient tryToGetClient() {
+ private RaftNodeAsClient tryToGetClient() throws RaftConnectionException {
for(;;){
- if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
+ if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+ resourceLock.lock();
try{
- numLock.lock();
- if(validClientNum.get() < MAX_VALID_CLIENT_NUM){
- validClientNum.incrementAndGet();
+ checkShuttingDown();
+ if(clientNumInUse.get() < MAX_VALID_CLIENT_NUM){
+ clientNumInUse.incrementAndGet();
queueClientNum--;
- return client;
+ return getClient();
}
- }finally {
- numLock.unlock();
+ } catch (RaftConnectionException e) {
+ queueClientNum--;
+ throw new RaftConnectionException(e);
+ } finally {
+ resourceLock.unlock();
}
}
}
}
- public void releaseClient() {
- validClientNum.decrementAndGet();
+ /**
+ * No-safe method, get client
+ */
+ private RaftNodeAsClient getClient() {
+ if (clientList.isEmpty()) {
+ return new RaftNodeAsClient();
+ } else {
+ return clientList.removeFirst();
+ }
+ }
+
+ /**
+ * Release usage of a client
+ */
+ public void releaseClient(RaftNodeAsClient client) {
+ resourceLock.lock();
+ try {
+ clientNumInUse.decrementAndGet();
+ clientList.addLast(client);
+ } finally {
+ resourceLock.unlock();
+ }
}
public void shutdown(){
- client.shutdown();
+ isShuttingDown.set(true);
+ while (clientNumInUse.get() != 0 && queueClientNum != 0){
+ // wait until releasing all usage of clients.
+ }
+ while(!clientList.isEmpty()){
+ clientList.removeFirst().shutdown();
+ }
+ }
+
+ /**
+ * Get client number in use
+ */
+ public int getClientNumInUse() {
+ return clientNumInUse.get();
+ }
+
+ /**
+ * Get client number in queue
+ */
+ public int getClientNumInQueue() {
+ return queueClientNum;
}
public static final RaftNodeAsClientManager getInstance() {
@@ -171,14 +231,13 @@ public class RaftNodeAsClientManager {
private void init(){
boltClientService = new BoltCliClientService();
boltClientService.init(new CliOptions());
- clientInited = true;
}
@Override
public void asyncHandleRequest(BasicRequest request, PeerId leader,
QPTask qpTask)
throws RaftConnectionException {
- LOGGER.debug("Node as client to send request to leader: {}", leader);
+ LOGGER.debug("Node as clientList to send request to leader: {}", leader);
try {
boltClientService.getRpcClient()
.invokeWithCallback(leader.getEndpoint().toString(), request,
@@ -187,15 +246,15 @@ public class RaftNodeAsClientManager {
@Override
public void onResponse(Object result) {
BasicResponse response = (BasicResponse) result;
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(response);
}
@Override
public void onException(Throwable e) {
- LOGGER.error("Bolt rpc client occurs errors when handling Request", e);
+ LOGGER.error("Bolt rpc clientList occurs errors when handling Request", e);
qpTask.setTaskState(TaskState.EXCEPTION);
- releaseClient();
+ releaseClient(RaftNodeAsClient.this);
qpTask.run(null);
}
@@ -207,7 +266,7 @@ public class RaftNodeAsClientManager {
} catch (RemotingException | InterruptedException e) {
LOGGER.error(e.getMessage());
qpTask.setTaskState(TaskState.EXCEPTION);
- releaseClient();
+ releaseClient(this);
qpTask.run(null);
throw new RaftConnectionException(e);
}
@@ -226,17 +285,16 @@ public class RaftNodeAsClientManager {
qpTask.run(null);
throw new RaftConnectionException(e);
} finally {
- releaseClient();
+ releaseClient(this);
}
}
/**
- * Shut down client
+ * Shut down clientList
*/
@Override
public void shutdown() {
boltClientService.shutdown();
- clientInited = false;
}
}