You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/06 05:56:00 UTC
[iotdb] 01/01: [IOTDB-5312] Consolidate ClientManagers in Datanodes for unified management (#8654)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch jira5312
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 7142f77f3e0596d86137b7ed84403d66987038b8
Author: Potato <ta...@apache.org>
AuthorDate: Fri Jan 6 13:31:05 2023 +0800
[IOTDB-5312] Consolidate ClientManagers in Datanodes for unified management (#8654)
* finish
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../client/sync/SyncConfigNodeClientPool.java | 5 +-
.../service/thrift/ConfigNodeRPCService.java | 2 +-
.../iot/client/AsyncIoTConsensusServiceClient.java | 73 +++++++------
.../iot/client/IoTConsensusClientPool.java | 10 +-
.../iot/client/SyncIoTConsensusServiceClient.java | 72 ++++--------
.../apache/iotdb/consensus/ratis/RatisClient.java | 15 +--
.../iotdb/consensus/ratis/RatisConsensus.java | 17 ++-
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 5 +-
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 5 +-
.../apache/iotdb/commons/client/ClientManager.java | 4 +
.../iotdb/commons/client/ClientPoolFactory.java | 121 ++++++++++++++++++---
.../apache/iotdb/commons/client/ThriftClient.java | 89 +++++++++++++++
.../AsyncConfigNodeHeartbeatServiceClient.java | 73 +++++++------
.../async/AsyncConfigNodeIServiceClient.java | 75 +++++++------
.../async/AsyncDataNodeHeartbeatServiceClient.java | 73 +++++++------
.../async/AsyncDataNodeInternalServiceClient.java | 73 +++++++------
.../AsyncDataNodeMPPDataExchangeServiceClient.java | 77 ++++++-------
.../BorrowNullClientManagerException.java} | 16 ++-
.../client/exception/ClientManagerException.java | 5 +
...ava => CreateTAsyncClientManagerException.java} | 7 +-
.../AsyncThriftClientFactory.java} | 42 +++----
.../client/{ => factory}/BaseClientFactory.java | 9 +-
.../ThriftClientFactory.java} | 16 ++-
.../client/{ => property}/ClientPoolProperty.java | 6 +-
.../ThriftClientProperty.java} | 12 +-
.../client/sync/SyncConfigNodeIServiceClient.java | 51 ++++-----
.../sync/SyncDataNodeInternalServiceClient.java | 67 ++++++------
.../SyncDataNodeMPPDataExchangeServiceClient.java | 57 +++++-----
.../sync/SyncThriftClientWithErrorHandler.java | 62 ++---------
.../apache/iotdb/commons/conf/CommonConfig.java | 43 +++++---
.../iotdb/commons/conf/CommonDescriptor.java | 51 ++++++++-
.../iotdb/commons/client/ClientManagerTest.java | 73 +++++++++----
.../iotdb/db/auth/ClusterAuthorityFetcher.java | 6 +-
.../apache/iotdb/db/client/ConfigNodeClient.java | 34 +++---
.../iotdb/db/client/ConfigNodeClientManager.java | 19 +++-
.../iotdb/db/client/DataNodeClientPoolFactory.java | 117 +-------------------
.../metadata/template/ClusterTemplateManager.java | 6 +-
.../execution/exchange/MPPDataExchangeService.java | 5 +-
.../org/apache/iotdb/db/mpp/plan/Coordinator.java | 10 +-
.../apache/iotdb/db/mpp/plan/TestRPCClient.java | 14 +--
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 5 +-
.../db/mpp/plan/analyze/cache/PartitionCache.java | 5 +-
.../config/executor/ClusterConfigTaskExecutor.java | 7 +-
.../db/sync/common/ClusterSyncInfoFetcher.java | 6 +-
.../db/trigger/executor/TriggerFireVisitor.java | 18 +--
.../trigger/service/TriggerInformationUpdater.java | 6 +-
.../exchange/MPPDataExchangeManagerTest.java | 8 +-
.../iotdb/db/mpp/plan/plan/QueryPlannerTest.java | 4 +-
48 files changed, 837 insertions(+), 739 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index 0d0f5336f3..cd19d0add8 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.client.sync;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -28,7 +29,6 @@ import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRestartReq;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -52,8 +52,7 @@ public class SyncConfigNodeClientPool {
private SyncConfigNodeClientPool() {
clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+ .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
configNodeLeader = new TEndPoint();
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
index e467d4b4bb..824b02b3e2 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCService.java
@@ -74,7 +74,7 @@ public class ConfigNodeRPCService extends ThriftService implements ConfigNodeRPC
configConf.getCnRpcMaxConcurrentClientNum(),
configConf.getThriftServerAwaitTimeForStopService(),
new ConfigNodeRPCServiceHandler(),
- commonConfig.isCnRpcThriftCompressionEnabled());
+ commonConfig.isRpcThriftCompressionEnabled());
} catch (RPCServiceException e) {
throw new IllegalAccessException(e.getMessage());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
index 1f08dabbc6..f84b583924 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/AsyncIoTConsensusServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient {
+public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncClient
+ implements ThriftClient {
private static final Logger logger =
LoggerFactory.getLogger(AsyncIoTConsensusServiceClient.class);
@@ -58,41 +60,44 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
this.clientManager = clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
public boolean isReady() {
try {
checkReady();
@@ -109,13 +114,13 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncIoTConsensusServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -127,21 +132,19 @@ public class AsyncIoTConsensusServiceClient extends IoTConsensusIService.AsyncCl
@Override
public PooledObject<AsyncIoTConsensusServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncIoTConsensusServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncIoTConsensusServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
index babc9fb4f5..eddb1067fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/IoTConsensusClientPool.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.commons.pool2.KeyedObjectPool;
@@ -48,11 +48,9 @@ public class IoTConsensusClientPool {
return new GenericKeyedObjectPool<>(
new SyncIoTConsensusServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
+ new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(
- config.getRpc().getSelectorNumOfClientManager())
.build()),
new ClientPoolProperty.Builder<SyncIoTConsensusServiceClient>().build().getConfig());
}
@@ -74,7 +72,7 @@ public class IoTConsensusClientPool {
return new GenericKeyedObjectPool<>(
new AsyncIoTConsensusServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
+ new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(config.getRpc().getConnectionTimeoutInMs())
.setRpcThriftCompressionEnabled(config.getRpc().isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
index 06fb777810..638893b116 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/SyncIoTConsensusServiceClient.java
@@ -20,16 +20,14 @@
package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.iot.thrift.IoTConsensusIService;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
@@ -37,19 +35,16 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
-import java.lang.reflect.Constructor;
-import java.net.SocketException;
-
public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
- implements SyncThriftClient, AutoCloseable {
+ implements ThriftClient, AutoCloseable {
- private final TEndPoint endPoint;
+ private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager;
public SyncIoTConsensusServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endPoint,
+ TEndPoint endpoint,
ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager)
throws TTransportException {
super(
@@ -57,59 +52,41 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endPoint.getIp(),
- endPoint.getPort(),
+ endpoint.getIp(),
+ endpoint.getPort(),
connectionTimeout))));
- this.endPoint = endPoint;
+ this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
- @TestOnly
- public TEndPoint getTEndpoint() {
- return endPoint;
- }
-
- @TestOnly
- public ClientManager<TEndPoint, SyncIoTConsensusServiceClient> getClientManager() {
- return clientManager;
- }
-
+ @Override
public void close() {
- if (clientManager != null) {
- clientManager.returnClient(endPoint, this);
- }
- }
-
- public void setTimeout(int timeout) {
- // the same transport is used in both input and output
- ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ clientManager.returnClient(endpoint, this);
}
+ @Override
public void invalidate() {
getInputProtocol().getTransport().close();
}
@Override
public void invalidateAll() {
- clientManager.clear(endPoint);
- }
-
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ clientManager.clear(endpoint);
}
@Override
public String toString() {
- return String.format("SyncIoTConsensusServiceClient{%s}", endPoint);
+ return String.format("SyncIoTConsensusServiceClient{%s}", endpoint);
}
- public static class Factory extends BaseClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
+ public static class Factory
+ extends ThriftClientFactory<TEndPoint, SyncIoTConsensusServiceClient> {
public Factory(
ClientManager<TEndPoint, SyncIoTConsensusServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
}
@Override
@@ -121,15 +98,13 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
@Override
public PooledObject<SyncIoTConsensusServiceClient> makeObject(TEndPoint endpoint)
throws Exception {
- Constructor<SyncIoTConsensusServiceClient> constructor =
- SyncIoTConsensusServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncIoTConsensusServiceClient.class,
- constructor,
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ SyncIoTConsensusServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endpoint,
clientManager));
}
@@ -137,8 +112,7 @@ public class SyncIoTConsensusServiceClient extends IoTConsensusIService.Client
@Override
public boolean validateObject(
TEndPoint endpoint, PooledObject<SyncIoTConsensusServiceClient> pooledObject) {
- return pooledObject.getObject() != null
- && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
index 7f0b11d117..80e80232da 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClient.java
@@ -18,9 +18,8 @@
*/
package org.apache.iotdb.consensus.ratis;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.factory.BaseClientFactory;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.commons.pool2.PooledObject;
@@ -41,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
public class RatisClient {
+
private final Logger logger = LoggerFactory.getLogger(RatisClient.class);
private final RaftGroup serveGroup;
private final RaftClient raftClient;
@@ -59,7 +59,7 @@ public class RatisClient {
return raftClient;
}
- public void close() {
+ private void close() {
try {
raftClient.close();
} catch (IOException e) {
@@ -68,9 +68,7 @@ public class RatisClient {
}
public void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(serveGroup, this);
- }
+ clientManager.returnClient(serveGroup, this);
}
public static class Factory extends BaseClientFactory<RaftGroup, RatisClient> {
@@ -81,11 +79,10 @@ public class RatisClient {
public Factory(
ClientManager<RaftGroup, RatisClient> clientManager,
- ClientFactoryProperty clientPoolProperty,
RaftProperties raftProperties,
RaftClientRpc clientRpc,
Supplier<RatisConfig.Impl> config) {
- super(clientManager, clientPoolProperty);
+ super(clientManager);
this.raftProperties = raftProperties;
this.clientRpc = clientRpc;
this.config = config;
@@ -97,7 +94,7 @@ public class RatisClient {
}
@Override
- public PooledObject<RatisClient> makeObject(RaftGroup group) throws Exception {
+ public PooledObject<RatisClient> makeObject(RaftGroup group) {
return new DefaultPooledObject<>(
new RatisClient(
group,
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 4fd5c46e95..94be8ca634 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,12 +21,11 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -106,9 +105,7 @@ class RatisConsensus implements IConsensus {
private final RaftProperties properties = new RaftProperties();
private final RaftClientRpc clientRpc;
- private final IClientManager<RaftGroup, RatisClient> clientManager =
- new IClientManager.Factory<RaftGroup, RatisClient>()
- .createClientManager(new RatisClientPoolFactory());
+ private final IClientManager<RaftGroup, RatisClient> clientManager;
private final Map<RaftGroupId, RaftGroup> lastSeen = new ConcurrentHashMap<>();
@@ -145,6 +142,10 @@ class RatisConsensus implements IConsensus {
Utils.initRatisConfig(properties, config.getRatisConfig());
this.config = config.getRatisConfig();
+ clientManager =
+ new IClientManager.Factory<RaftGroup, RatisClient>()
+ .createClientManager(new RatisClientPoolFactory());
+
clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
server =
@@ -812,11 +813,7 @@ class RatisConsensus implements IConsensus {
ClientManager<RaftGroup, RatisClient> manager) {
return new GenericKeyedObjectPool<>(
new RatisClient.Factory(
- manager,
- new ClientFactoryProperty.Builder().build(),
- properties,
- clientRpc,
- MemoizedSupplier.valueOf(() -> config.getImpl())),
+ manager, properties, clientRpc, MemoizedSupplier.valueOf(config::getImpl)),
new ClientPoolProperty.Builder<RatisClient>().build().getConfig());
}
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index bd5bd2f247..af9d89866d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -19,13 +19,13 @@
package org.apache.iotdb.it.env;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.it.framework.IoTDBTestLogger;
@@ -81,8 +81,7 @@ public abstract class AbstractEnv implements BaseEnv {
clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+ .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
final String testClassName = getTestClassName();
final String testMethodName = getTestMethodName();
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index 20040842e2..0099fb11c2 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.it.env;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.jdbc.Config;
@@ -61,8 +61,7 @@ public class RemoteServerEnv implements BaseEnv {
}
clientManager =
new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+ .createClientManager(new ClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
}
@Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 66836967de..dc1f5fc83f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.commons.client;
+import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -45,6 +46,9 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
@Override
public V borrowClient(K node) throws ClientManagerException {
+ if (node == null) {
+ throw new BorrowNullClientManagerException();
+ }
try {
return pool.borrowObject(node);
} catch (Exception e) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 094470deb6..a795b20199 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -21,9 +21,15 @@ package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncConfigNodeHeartbeatServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.client.async.AsyncDataNodeHeartbeatServiceClient;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -37,18 +43,58 @@ public class ClientPoolFactory {
private ClientPoolFactory() {}
+ public static class SyncConfigNodeIServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
+
+ @Override
+ public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
+ ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new SyncConfigNodeIServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .build()),
+ new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
+ }
+ }
+
+ public static class AsyncConfigNodeIServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+
+ @Override
+ public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new AsyncConfigNodeIServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .build(),
+ ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
+ new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
+ .setMaxIdleClientForEachNode(conf.getMaxIdleClientForEachNode())
+ .setMaxTotalClientForEachNode(conf.getMaxTotalClientForEachNode())
+ .build()
+ .getConfig());
+ }
+ }
+
public static class SyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
}
@@ -56,16 +102,17 @@ public class ClientPoolFactory {
public static class AsyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
@@ -74,16 +121,17 @@ public class ClientPoolFactory {
public static class AsyncConfigNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncConfigNodeHeartbeatServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncConfigNodeHeartbeatServiceClient>()
@@ -94,16 +142,17 @@ public class ClientPoolFactory {
public static class AsyncDataNodeHeartbeatServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeHeartbeatServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeHeartbeatServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getCnConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isCnRpcThriftCompressionEnabled())
- .setSelectorNumOfAsyncClientManager(conf.getCnSelectorNumOfClientManager())
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build(),
ThreadName.ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeHeartbeatServiceClient>()
@@ -111,4 +160,44 @@ public class ClientPoolFactory {
.getConfig());
}
}
+
+ public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
+
+ @Override
+ public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
+ ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new SyncDataNodeMPPDataExchangeServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .build()),
+ new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
+ .build()
+ .getConfig());
+ }
+ }
+
+ public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
+ implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+
+ @Override
+ public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
+ ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
+ return new GenericKeyedObjectPool<>(
+ new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
+ manager,
+ new ThriftClientProperty.Builder()
+ .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
+ .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
+ .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .build(),
+ ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
+ new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
+ .build()
+ .getConfig());
+ }
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
new file mode 100644
index 0000000000..b8124f939f
--- /dev/null
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.net.SocketException;
+
+/**
+ * This class defines the failed interfaces that thrift client needs to support so that the Thrift
+ * Client can clean up the clientManager when it receives the corresponding exception.
+ */
+public interface ThriftClient {
+
+ Logger logger = LoggerFactory.getLogger(ThriftClient.class);
+
+ /** Close this connection. */
+ void invalidate();
+
+ /** Removing all pooled instances corresponding to current instance's endpoint. */
+ void invalidateAll();
+
+ static void resolveException(Throwable t, ThriftClient o) {
+ Throwable origin = t;
+ if (t instanceof InvocationTargetException) {
+ origin = ((InvocationTargetException) t).getTargetException();
+ }
+ Throwable cur = origin;
+ if (cur instanceof TException) {
+ int level = 0;
+ while (cur != null) {
+ logger.debug(
+ "level-{} Exception class {}, message {}",
+ level,
+ cur.getClass().getName(),
+ cur.getMessage());
+ cur = cur.getCause();
+ level++;
+ }
+ o.invalidate();
+ }
+
+ Throwable rootCause = ExceptionUtils.getRootCause(origin);
+ if (rootCause != null) {
+ // if the exception is SocketException and its error message is Broken pipe, it means that
+ // the remote node may restart and all the connection we cached before should be cleared.
+ logger.debug(
+ "root cause message {}, LocalizedMessage {}, ",
+ rootCause.getMessage(),
+ rootCause.getLocalizedMessage(),
+ rootCause);
+ if (isConnectionBroken(rootCause)) {
+ logger.debug(
+ "Broken pipe error happened in sending RPC,"
+ + " we need to clear all previous cached connection",
+ t);
+ o.invalidateAll();
+ }
+ }
+ }
+
+ static boolean isConnectionBroken(Throwable cause) {
+ return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
+ || (cause instanceof TTransportException
+ && cause.getMessage().contains("Socket is closed by peer"));
+ }
+}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
index 38b12d2370..86de8b6f02 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
import java.io.IOException;
-public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService.AsyncClient
+ implements ThriftClient {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
this.clientManager = clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
public boolean isReady() {
try {
checkReady();
@@ -103,13 +108,13 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncConfigNodeHeartbeatServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -121,21 +126,19 @@ public class AsyncConfigNodeHeartbeatServiceClient extends IConfigNodeRPCService
@Override
public PooledObject<AsyncConfigNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncConfigNodeHeartbeatServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncConfigNodeHeartbeatServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
index 32219dcbfb..91d64ab257 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient {
+public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncClient
+ implements ThriftClient {
private static final Logger logger = LoggerFactory.getLogger(AsyncConfigNodeIServiceClient.class);
@@ -57,47 +59,50 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
this.clientManager = clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
public boolean isReady() {
try {
checkReady();
return true;
} catch (Exception e) {
- logger.info("Unexpected exception occurs in {} :", this, e);
+ logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
return false;
}
}
@@ -108,13 +113,13 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -126,21 +131,19 @@ public class AsyncConfigNodeIServiceClient extends IConfigNodeRPCService.AsyncCl
@Override
public PooledObject<AsyncConfigNodeIServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncConfigNodeIServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncConfigNodeIServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
index 2812886336..50c0540fd0 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeHeartbeatServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -33,7 +34,8 @@ import org.apache.thrift.protocol.TProtocolFactory;
import java.io.IOException;
-public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.AsyncClient
+ implements ThriftClient {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager;
@@ -53,41 +55,44 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
this.clientManager = clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
public boolean isReady() {
try {
checkReady();
@@ -103,13 +108,13 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeHeartbeatServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncDataNodeHeartbeatServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -121,21 +126,19 @@ public class AsyncDataNodeHeartbeatServiceClient extends IDataNodeRPCService.Asy
@Override
public PooledObject<AsyncDataNodeHeartbeatServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncDataNodeHeartbeatServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncDataNodeHeartbeatServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 5329a4d9f2..46c6eec275 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -36,7 +37,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient {
+public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.AsyncClient
+ implements ThriftClient {
private static final Logger logger =
LoggerFactory.getLogger(AsyncDataNodeInternalServiceClient.class);
@@ -69,41 +71,44 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
return clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
public boolean isReady() {
try {
checkReady();
@@ -120,13 +125,13 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -138,21 +143,19 @@ public class AsyncDataNodeInternalServiceClient extends IDataNodeRPCService.Asyn
@Override
public PooledObject<AsyncDataNodeInternalServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncDataNodeInternalServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncDataNodeInternalServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
index 667f3189b1..7b056fc350 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.async;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.AsyncBaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
@@ -35,7 +36,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
-public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient {
+public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.AsyncClient
+ implements ThriftClient {
private static final Logger logger =
LoggerFactory.getLogger(AsyncDataNodeMPPDataExchangeServiceClient.class);
@@ -58,47 +60,50 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
this.clientManager = clientManager;
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
- }
-
- /**
- * return self if clientManager is not null, the method doesn't need to call by user, it will
- * trigger once client transport complete.
- */
- private void returnSelf() {
- if (clientManager != null) {
- clientManager.returnClient(endpoint, this);
- }
- }
-
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onComplete() {
super.onComplete();
returnSelf();
}
- /**
- * This method will be automatically called by the thrift selector thread, and we'll just simulate
- * the behavior in our test
- */
@Override
public void onError(Exception e) {
super.onError(e);
+ ThriftClient.resolveException(e, this);
returnSelf();
}
- public boolean isReady() {
+ @Override
+ public void invalidate() {
+ if (!hasError()) {
+ super.onError(new Exception("This client has been invalidated"));
+ }
+ }
+
+ @Override
+ public void invalidateAll() {
+ clientManager.clear(endpoint);
+ }
+
+ /**
+ * return self, the method doesn't need to be called by the user and will be triggered after the
+ * RPC is finished.
+ */
+ private void returnSelf() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ private void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
+ private boolean isReady() {
try {
checkReady();
return true;
} catch (Exception e) {
- logger.info("Unexpected exception occurs in {} :", this, e);
+ logger.error("Unexpected exception occurs in {} : {}", this, e.getMessage());
return false;
}
}
@@ -109,13 +114,13 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
}
public static class Factory
- extends AsyncBaseClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
+ extends AsyncThriftClientFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
public Factory(
ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty, threadName);
+ super(clientManager, thriftClientProperty, threadName);
}
@Override
@@ -127,21 +132,19 @@ public class AsyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSe
@Override
public PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endPoint)
throws Exception {
- TAsyncClientManager tManager = tManagers[clientCnt.incrementAndGet() % tManagers.length];
- tManager = tManager == null ? new TAsyncClientManager() : tManager;
return new DefaultPooledObject<>(
new AsyncDataNodeMPPDataExchangeServiceClient(
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endPoint,
- tManager,
+ tManagers[clientCnt.incrementAndGet() % tManagers.length],
clientManager));
}
@Override
public boolean validateObject(
TEndPoint endPoint, PooledObject<AsyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().isReady();
+ return pooledObject.getObject().isReady();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
similarity index 74%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
index ff3862f879..4cdf92fa4a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/BorrowNullClientManagerException.java
@@ -16,16 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.commons.client.sync;
-public interface SyncThriftClient {
+package org.apache.iotdb.commons.client.exception;
- /** close the connection */
- void invalidate();
+public class BorrowNullClientManagerException extends ClientManagerException {
- /**
- * Clears the specified pool, removing all pooled instances corresponding to current instance's
- * endPoint.
- */
- void invalidateAll();
+ private static final String MESSAGE = "Can not borrow client for node null";
+
+ public BorrowNullClientManagerException() {
+ super(MESSAGE);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
index 439f25b655..7da6297305 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
@@ -20,7 +20,12 @@
package org.apache.iotdb.commons.client.exception;
public class ClientManagerException extends Exception {
+
public ClientManagerException(Exception exception) {
super(exception);
}
+
+ public ClientManagerException(String message) {
+ super(message);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
similarity index 82%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
index 439f25b655..075e0cfb11 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/CreateTAsyncClientManagerException.java
@@ -19,8 +19,9 @@
package org.apache.iotdb.commons.client.exception;
-public class ClientManagerException extends Exception {
- public ClientManagerException(Exception exception) {
- super(exception);
+public class CreateTAsyncClientManagerException extends RuntimeException {
+
+ public CreateTAsyncClientManagerException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
similarity index 50%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
index 73fd0dca22..afde802f4c 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/AsyncThriftClientFactory.java
@@ -17,41 +17,41 @@
* under the License.
*/
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.exception.CreateTAsyncClientManagerException;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.thrift.async.TAsyncClientManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
+public abstract class AsyncThriftClientFactory<K, V> extends ThriftClientFactory<K, V> {
- private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
- protected TAsyncClientManager[] tManagers;
- protected AtomicInteger clientCnt = new AtomicInteger();
+ protected final TAsyncClientManager[] tManagers;
+ protected final AtomicInteger clientCnt = new AtomicInteger();
private static final String THRIFT_THREAD_NAME = "TAsyncClientManager#SelectorThread";
- protected AsyncBaseClientFactory(
+ protected AsyncThriftClientFactory(
ClientManager<K, V> clientManager,
- ClientFactoryProperty clientFactoryProperty,
+ ThriftClientProperty thriftClientProperty,
String threadName) {
- super(clientManager, clientFactoryProperty);
- synchronized (this) {
- tManagers = new TAsyncClientManager[clientFactoryProperty.getSelectorNumOfAsyncClientPool()];
+ super(clientManager, thriftClientProperty);
+ try {
+ tManagers = new TAsyncClientManager[thriftClientProperty.getSelectorNumOfAsyncClientPool()];
for (int i = 0; i < tManagers.length; i++) {
- try {
- tManagers[i] = new TAsyncClientManager();
- } catch (IOException e) {
- logger.error("Cannot create Async client factory", e);
- }
+ tManagers[i] = new TAsyncClientManager();
}
- Thread.getAllStackTraces().keySet().stream()
- .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
- .collect(Collectors.toList())
- .forEach(thread -> thread.setName(threadName + "-selector" + "-" + thread.getId()));
+ } catch (IOException e) {
+ throw new CreateTAsyncClientManagerException(
+ String.format("Cannot create Async thrift client factory %s", threadName), e);
}
+ Thread.getAllStackTraces().keySet().stream()
+ .filter(thread -> thread.getName().contains(THRIFT_THREAD_NAME))
+ .collect(Collectors.toList())
+ .forEach(thread -> thread.setName(threadName + "-selector-" + thread.getId()));
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
similarity index 82%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
index 510d495cae..680aa2efcd 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/BaseClientFactory.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.factory;
+
+import org.apache.iotdb.commons.client.ClientManager;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
@@ -25,12 +27,9 @@ import org.apache.commons.pool2.PooledObject;
public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
protected ClientManager<K, V> clientManager;
- protected ClientFactoryProperty clientFactoryProperty;
- protected BaseClientFactory(
- ClientManager<K, V> clientManager, ClientFactoryProperty clientFactoryProperty) {
+ protected BaseClientFactory(ClientManager<K, V> clientManager) {
this.clientManager = clientManager;
- this.clientFactoryProperty = clientFactoryProperty;
}
@Override
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
similarity index 61%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
index 439f25b655..8c6c178279 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/factory/ThriftClientFactory.java
@@ -17,10 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.commons.client.exception;
+package org.apache.iotdb.commons.client.factory;
-public class ClientManagerException extends Exception {
- public ClientManagerException(Exception exception) {
- super(exception);
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
+
+public abstract class ThriftClientFactory<K, V> extends BaseClientFactory<K, V> {
+
+ protected ThriftClientProperty thriftClientProperty;
+
+ protected ThriftClientFactory(
+ ClientManager<K, V> clientManager, ThriftClientProperty thriftClientProperty) {
+ super(clientManager);
+ this.thriftClientProperty = thriftClientProperty;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
similarity index 94%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
index 45870e9ba6..fc9ae843f9 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ClientPoolProperty.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
@@ -84,7 +84,7 @@ public class ClientPoolProperty<V> {
private DefaultProperty() {}
public static final long WAIT_CLIENT_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
- public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 100;
- public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 100;
+ public static final int MAX_TOTAL_CLIENT_FOR_EACH_NODE = 300;
+ public static final int MAX_IDLE_CLIENT_FOR_EACH_NODE = 200;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
similarity index 93%
rename from node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
rename to node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
index 363decbf2d..c54c23fbd7 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/property/ThriftClientProperty.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.property;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -25,13 +25,13 @@ import org.apache.thrift.protocol.TProtocolFactory;
import java.util.concurrent.TimeUnit;
-public class ClientFactoryProperty {
+public class ThriftClientProperty {
private final TProtocolFactory protocolFactory;
private final int connectionTimeoutMs;
private final int selectorNumOfAsyncClientPool;
- public ClientFactoryProperty(
+ public ThriftClientProperty(
TProtocolFactory protocolFactory, int connectionTimeoutMs, int selectorNumOfAsyncClientPool) {
this.protocolFactory = protocolFactory;
this.connectionTimeoutMs = connectionTimeoutMs;
@@ -75,8 +75,8 @@ public class ClientFactoryProperty {
return this;
}
- public ClientFactoryProperty build() {
- return new ClientFactoryProperty(
+ public ThriftClientProperty build() {
+ return new ThriftClientProperty(
rpcThriftCompressionEnabled
? new TCompactProtocol.Factory()
: new TBinaryProtocol.Factory(),
@@ -90,7 +90,7 @@ public class ClientFactoryProperty {
private DefaultProperty() {}
public static final boolean RPC_THRIFT_COMPRESSED_ENABLED = false;
- public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(30);
+ public static final int CONNECTION_TIMEOUT_MS = (int) TimeUnit.SECONDS.toMillis(20);
public static final int SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER = 1;
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
index 522470d8be..6f6024ccd2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncConfigNodeIServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,13 +35,12 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
-import java.lang.reflect.Constructor;
import java.net.SocketException;
public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
- implements SyncThriftClient, AutoCloseable {
+ implements ThriftClient, AutoCloseable {
- private final TEndPoint endPoint;
+ private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
public SyncConfigNodeIServiceClient(
@@ -57,15 +57,13 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
endPoint.getIp(),
endPoint.getPort(),
connectionTimeout))));
- this.endPoint = endPoint;
+ this.endpoint = endPoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
- public void close() {
- if (clientManager != null) {
- clientManager.returnClient(endPoint, this);
- }
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
public void setTimeout(int timeout) {
@@ -73,30 +71,32 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
}
+ @Override
+ public void close() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ @Override
public void invalidate() {
getInputProtocol().getTransport().close();
}
@Override
public void invalidateAll() {
- clientManager.clear(endPoint);
- }
-
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ clientManager.clear(endpoint);
}
@Override
public String toString() {
- return String.format("SyncConfigNodeIServiceClient{%s}", endPoint);
+ return String.format("SyncConfigNodeIServiceClient{%s}", endpoint);
}
- public static class Factory extends BaseClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
+ public static class Factory extends ThriftClientFactory<TEndPoint, SyncConfigNodeIServiceClient> {
public Factory(
ClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
}
@Override
@@ -108,15 +108,13 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
@Override
public PooledObject<SyncConfigNodeIServiceClient> makeObject(TEndPoint endpoint)
throws Exception {
- Constructor<SyncConfigNodeIServiceClient> constructor =
- SyncConfigNodeIServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncConfigNodeIServiceClient.class,
- constructor,
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ SyncConfigNodeIServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endpoint,
clientManager));
}
@@ -124,8 +122,7 @@ public class SyncConfigNodeIServiceClient extends IConfigNodeRPCService.Client
@Override
public boolean validateObject(
TEndPoint endpoint, PooledObject<SyncConfigNodeIServiceClient> pooledObject) {
- return pooledObject.getObject() != null
- && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index 3d992c5b48..cab879e08a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
import org.apache.iotdb.rpc.RpcTransportFactory;
@@ -35,19 +36,18 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
-import java.lang.reflect.Constructor;
import java.net.SocketException;
public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Client
- implements SyncThriftClient, AutoCloseable {
+ implements ThriftClient, AutoCloseable {
- private final TEndPoint endPoint;
+ private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager;
public SyncDataNodeInternalServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endPoint,
+ TEndPoint endpoint,
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager)
throws TTransportException {
super(
@@ -55,17 +55,26 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endPoint.getIp(),
- endPoint.getPort(),
+ endpoint.getIp(),
+ endpoint.getPort(),
connectionTimeout))));
- this.endPoint = endPoint;
+ this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ }
+
+ public void setTimeout(int timeout) {
+ // the same transport is used in both input and output
+ ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
@TestOnly
public TEndPoint getTEndpoint() {
- return endPoint;
+ return endpoint;
}
@TestOnly
@@ -73,42 +82,33 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
return clientManager;
}
+ @Override
public void close() {
- if (clientManager != null) {
- clientManager.returnClient(endPoint, this);
- }
- }
-
- public void setTimeout(int timeout) {
- // the same transport is used in both input and output
- ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ clientManager.returnClient(endpoint, this);
}
+ @Override
public void invalidate() {
getInputProtocol().getTransport().close();
}
@Override
public void invalidateAll() {
- clientManager.clear(endPoint);
- }
-
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ clientManager.clear(endpoint);
}
@Override
public String toString() {
- return String.format("SyncDataNodeInternalServiceClient{%s}", endPoint);
+ return String.format("SyncDataNodeInternalServiceClient{%s}", endpoint);
}
public static class Factory
- extends BaseClientFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+ extends ThriftClientFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
public Factory(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
}
@Override
@@ -120,15 +120,13 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
@Override
public PooledObject<SyncDataNodeInternalServiceClient> makeObject(TEndPoint endpoint)
throws Exception {
- Constructor<SyncDataNodeInternalServiceClient> constructor =
- SyncDataNodeInternalServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncDataNodeInternalServiceClient.class,
- constructor,
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ SyncDataNodeInternalServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endpoint,
clientManager));
}
@@ -136,8 +134,7 @@ public class SyncDataNodeInternalServiceClient extends IDataNodeRPCService.Clien
@Override
public boolean validateObject(
TEndPoint endpoint, PooledObject<SyncDataNodeInternalServiceClient> pooledObject) {
- return pooledObject.getObject() != null
- && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
index 7bcbd05071..dd3474e865 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeMPPDataExchangeServiceClient.java
@@ -20,9 +20,10 @@
package org.apache.iotdb.commons.client.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.mpp.rpc.thrift.MPPDataExchangeService;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
@@ -34,19 +35,18 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
-import java.lang.reflect.Constructor;
import java.net.SocketException;
public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeService.Client
- implements SyncThriftClient, AutoCloseable {
+ implements ThriftClient, AutoCloseable {
- private final TEndPoint endPoint;
+ private final TEndPoint endpoint;
private final ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager;
public SyncDataNodeMPPDataExchangeServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
- TEndPoint endPoint,
+ TEndPoint endpoint,
ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager)
throws TTransportException {
super(
@@ -54,18 +54,16 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
RpcTransportFactory.INSTANCE.getTransport(
new TSocket(
TConfigurationConst.defaultTConfiguration,
- endPoint.getIp(),
- endPoint.getPort(),
+ endpoint.getIp(),
+ endpoint.getPort(),
connectionTimeout))));
- this.endPoint = endPoint;
+ this.endpoint = endpoint;
this.clientManager = clientManager;
getInputProtocol().getTransport().open();
}
- public void close() {
- if (clientManager != null) {
- clientManager.returnClient(endPoint, this);
- }
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
public void setTimeout(int timeout) {
@@ -73,31 +71,33 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
}
+ @Override
+ public void close() {
+ clientManager.returnClient(endpoint, this);
+ }
+
+ @Override
public void invalidate() {
getInputProtocol().getTransport().close();
}
@Override
public void invalidateAll() {
- clientManager.clear(endPoint);
- }
-
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ clientManager.clear(endpoint);
}
@Override
public String toString() {
- return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endPoint);
+ return String.format("SyncDataNodeMPPDataExchangeServiceClient{%s}", endpoint);
}
public static class Factory
- extends BaseClientFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
+ extends ThriftClientFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
public Factory(
ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
}
@Override
@@ -109,15 +109,13 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
@Override
public PooledObject<SyncDataNodeMPPDataExchangeServiceClient> makeObject(TEndPoint endpoint)
throws Exception {
- Constructor<SyncDataNodeMPPDataExchangeServiceClient> constructor =
- SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
- TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
SyncDataNodeMPPDataExchangeServiceClient.class,
- constructor,
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ SyncDataNodeMPPDataExchangeServiceClient.class.getConstructor(
+ TProtocolFactory.class, int.class, endpoint.getClass(), clientManager.getClass()),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
endpoint,
clientManager));
}
@@ -125,8 +123,7 @@ public class SyncDataNodeMPPDataExchangeServiceClient extends MPPDataExchangeSer
@Override
public boolean validateObject(
TEndPoint endpoint, PooledObject<SyncDataNodeMPPDataExchangeServiceClient> pooledObject) {
- return pooledObject.getObject() != null
- && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
+ return pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
index d864984bc9..32c75da38f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncThriftClientWithErrorHandler.java
@@ -18,26 +18,24 @@
*/
package org.apache.iotdb.commons.client.sync;
+import org.apache.iotdb.commons.client.ThriftClient;
+
import net.sf.cglib.proxy.Enhancer;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.net.SocketException;
public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(SyncThriftClientWithErrorHandler.class);
-
- public static <V extends SyncThriftClient> V newErrorHandler(
+ /**
+ * Note: The caller needs to ensure that the constructor corresponds to the class, or the cast
+ * might fail.
+ */
+ @SuppressWarnings("unchecked")
+ public static <V extends ThriftClient> V newErrorHandler(
Class<V> targetClass, Constructor<V> constructor, Object... args) {
Enhancer enhancer = new Enhancer();
enhancer.setSuperclass(targetClass);
@@ -54,51 +52,9 @@ public class SyncThriftClientWithErrorHandler implements MethodInterceptor {
try {
return methodProxy.invokeSuper(o, objects);
} catch (Throwable t) {
- Throwable origin = t;
- if (t instanceof InvocationTargetException) {
- origin = ((InvocationTargetException) t).getTargetException();
- }
- Throwable cur = origin;
- if (cur instanceof TException) {
- int level = 0;
- while (cur != null) {
- LOGGER.debug(
- "level-{} Exception class {}, message {}",
- level,
- cur.getClass().getName(),
- cur.getMessage());
- cur = cur.getCause();
- level++;
- }
- ((SyncThriftClient) o).invalidate();
- }
-
- Throwable rootCause = ExceptionUtils.getRootCause(origin);
- if (rootCause != null) {
- // if the exception is SocketException and its error message is Broken pipe, it means that
- // the remote node may restart and all the connection we cached before should be cleared.
- LOGGER.debug(
- "root cause message {}, LocalizedMessage {}, ",
- rootCause.getMessage(),
- rootCause.getLocalizedMessage(),
- rootCause);
- if (isConnectionBroken(rootCause)) {
- LOGGER.debug(
- "Broken pipe error happened in calling method {}, we need to clear all previous cached connection, err: {}",
- method.getName(),
- t);
- ((SyncThriftClient) o).invalidate();
- ((SyncThriftClient) o).invalidateAll();
- }
- }
+ ThriftClient.resolveException(t, (ThriftClient) o);
throw new TException(
"Error in calling method " + method.getName() + ", because: " + t.getMessage(), t);
}
}
-
- private boolean isConnectionBroken(Throwable cause) {
- return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe"))
- || (cause instanceof TTransportException
- && cause.getMessage().contains("Socket is closed by peer"));
- }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 3b3b3359f1..b930ea22e3 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -99,13 +99,14 @@ public class CommonConfig {
* ClientManager will have so many selector threads (TAsyncClientManager) to distribute to its
* clients.
*/
- private int selectorNumOfClientManager =
- Runtime.getRuntime().availableProcessors() / 4 > 0
- ? Runtime.getRuntime().availableProcessors() / 4
- : 1;
+ private int selectorNumOfClientManager = 1;
/** whether to use thrift compression. */
- private boolean isCnRpcThriftCompressionEnabled = false;
+ private boolean isRpcThriftCompressionEnabled = false;
+
+ private int maxTotalClientForEachNode = 300;
+
+ private int maxIdleClientForEachNode = 200;
/** What will the system do when unrecoverable error occurs. */
private HandleSystemErrorStrategy handleSystemErrorStrategy =
@@ -246,28 +247,44 @@ public class CommonConfig {
this.defaultTTLInMs = defaultTTLInMs;
}
- public int getCnConnectionTimeoutInMS() {
+ public int getConnectionTimeoutInMS() {
return connectionTimeoutInMS;
}
- public void setCnConnectionTimeoutInMS(int connectionTimeoutInMS) {
+ public void setConnectionTimeoutInMS(int connectionTimeoutInMS) {
this.connectionTimeoutInMS = connectionTimeoutInMS;
}
- public int getCnSelectorNumOfClientManager() {
+ public int getSelectorNumOfClientManager() {
return selectorNumOfClientManager;
}
- public void setCnSelectorNumOfClientManager(int selectorNumOfClientManager) {
+ public void setSelectorNumOfClientManager(int selectorNumOfClientManager) {
this.selectorNumOfClientManager = selectorNumOfClientManager;
}
- public boolean isCnRpcThriftCompressionEnabled() {
- return isCnRpcThriftCompressionEnabled;
+ public boolean isRpcThriftCompressionEnabled() {
+ return isRpcThriftCompressionEnabled;
+ }
+
+ public void setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+ isRpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
+ }
+
+ public int getMaxTotalClientForEachNode() {
+ return maxTotalClientForEachNode;
+ }
+
+ public void setMaxTotalClientForEachNode(int maxTotalClientForEachNode) {
+ this.maxTotalClientForEachNode = maxTotalClientForEachNode;
+ }
+
+ public int getMaxIdleClientForEachNode() {
+ return maxIdleClientForEachNode;
}
- public void setCnRpcThriftCompressionEnabled(boolean cnRpcThriftCompressionEnabled) {
- isCnRpcThriftCompressionEnabled = cnRpcThriftCompressionEnabled;
+ public void setMaxIdleClientForEachNode(int maxIdleClientForEachNode) {
+ this.maxIdleClientForEachNode = maxIdleClientForEachNode;
}
HandleSystemErrorStrategy getHandleSystemErrorStrategy() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 6ef3e0cc01..f53521f77b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -93,27 +93,66 @@ public class CommonDescriptor {
.trim()
.split(","));
- config.setCnRpcThriftCompressionEnabled(
+ config.setRpcThriftCompressionEnabled(
Boolean.parseBoolean(
properties
.getProperty(
"cn_rpc_thrift_compression_enable",
- String.valueOf(config.isCnRpcThriftCompressionEnabled()))
+ String.valueOf(config.isRpcThriftCompressionEnabled()))
.trim()));
- config.setCnConnectionTimeoutInMS(
+ config.setConnectionTimeoutInMS(
Integer.parseInt(
properties
.getProperty(
- "cn_connection_timeout_ms", String.valueOf(config.getCnConnectionTimeoutInMS()))
+ "cn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
.trim()));
- config.setCnSelectorNumOfClientManager(
+ config.setSelectorNumOfClientManager(
Integer.parseInt(
properties
.getProperty(
"cn_selector_thread_nums_of_client_manager",
- String.valueOf(config.getCnSelectorNumOfClientManager()))
+ String.valueOf(config.getSelectorNumOfClientManager()))
+ .trim()));
+
+ config.setConnectionTimeoutInMS(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_connection_timeout_ms", String.valueOf(config.getConnectionTimeoutInMS()))
+ .trim()));
+
+ config.setRpcThriftCompressionEnabled(
+ Boolean.parseBoolean(
+ properties
+ .getProperty(
+ "dn_rpc_thrift_compression_enable",
+ String.valueOf(config.isRpcThriftCompressionEnabled()))
+ .trim()));
+
+ config.setSelectorNumOfClientManager(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_selector_thread_nums_of_client_manager",
+ String.valueOf(config.getSelectorNumOfClientManager()))
+ .trim()));
+
+ config.setMaxTotalClientForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_max_connection_for_internal_service",
+ String.valueOf(config.getMaxTotalClientForEachNode()))
+ .trim()));
+
+ config.setMaxIdleClientForEachNode(
+ Integer.parseInt(
+ properties
+ .getProperty(
+ "dn_core_connection_for_internal_service",
+ String.valueOf(config.getMaxIdleClientForEachNode()))
.trim()));
config.setHandleSystemErrorStrategy(
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 1375423486..061fd0f775 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -21,9 +21,13 @@ package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.exception.BorrowNullClientManagerException;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.mock.MockInternalRPCService;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.mpp.rpc.thrift.IDataNodeRPCService;
@@ -61,20 +65,22 @@ public class ClientManagerTest {
/**
* We put all tests together to avoid frequent restarts of thrift Servers, which can cause "bind
* address already used" problems in macOS CI environments. The reason for this may be about this
- * [blog](https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux)
+ * <a
+ * href="https://stackoverflow.com/questions/51998042/macos-so-reuseaddr-so-reuseport-not-consistent-with-linux">blog</a>
*/
@Test
public void allTest() throws Exception {
- normalSyncClientManagersTest();
- normalAsyncClientManagersTest();
- MaxIdleClientManagersTest();
- MaxTotalClientManagersTest();
- MaxWaitClientTimeoutClientManagersTest();
- InvalidSyncClientReturnClientManagersTest();
- InvalidAsyncClientReturnClientManagersTest();
+ normalSyncTest();
+ normalAsyncTest();
+ maxIdleTest();
+ maxTotalTest();
+ maxWaitClientTimeoutTest();
+ invalidSyncClientReturnTest();
+ invalidAsyncClientReturnTest();
+ borrowNullTest();
}
- public void normalSyncClientManagersTest() throws Exception {
+ public void normalSyncTest() throws Exception {
// init syncClientManager
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
(ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
@@ -117,7 +123,7 @@ public class ClientManagerTest {
Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
}
- public void normalAsyncClientManagersTest() throws Exception {
+ public void normalAsyncTest() throws Exception {
// init asyncClientManager
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
(ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -158,7 +164,7 @@ public class ClientManagerTest {
Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
}
- public void MaxIdleClientManagersTest() throws Exception {
+ public void maxIdleTest() throws Exception {
int maxIdleClientForEachNode = 1;
// init syncClientManager and set maxIdleClientForEachNode to 1
@@ -173,7 +179,7 @@ public class ClientManagerTest {
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
- manager, new ClientFactoryProperty.Builder().build()),
+ manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxIdleClientForEachNode(maxIdleClientForEachNode)
.build()
@@ -217,7 +223,7 @@ public class ClientManagerTest {
Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
}
- public void MaxTotalClientManagersTest() throws Exception {
+ public void maxTotalTest() throws Exception {
int maxTotalClientForEachNode = 1;
long waitClientTimeoutMs = TimeUnit.SECONDS.toMillis(1);
@@ -233,7 +239,7 @@ public class ClientManagerTest {
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
- manager, new ClientFactoryProperty.Builder().build()),
+ manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setMaxTotalClientForEachNode(maxTotalClientForEachNode)
.setWaitClientTimeoutMS(waitClientTimeoutMs)
@@ -257,6 +263,7 @@ public class ClientManagerTest {
try {
start = System.nanoTime();
syncClient2 = syncClusterManager.borrowClient(endPoint);
+ Assert.fail();
} catch (ClientManagerException e) {
long end = System.nanoTime();
Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000);
@@ -289,7 +296,7 @@ public class ClientManagerTest {
Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
}
- public void MaxWaitClientTimeoutClientManagersTest() throws Exception {
+ public void maxWaitClientTimeoutTest() throws Exception {
long waitClientTimeoutMS = TimeUnit.SECONDS.toMillis(2);
int maxTotalClientForEachNode = 1;
@@ -306,7 +313,7 @@ public class ClientManagerTest {
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
- manager, new ClientFactoryProperty.Builder().build()),
+ manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
.setWaitClientTimeoutMS(waitClientTimeoutMS)
.setMaxTotalClientForEachNode(maxTotalClientForEachNode)
@@ -329,6 +336,7 @@ public class ClientManagerTest {
try {
start = System.nanoTime();
syncClient1 = syncClusterManager.borrowClient(endPoint);
+ Assert.fail();
} catch (ClientManagerException e) {
long end = System.nanoTime();
Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
@@ -348,7 +356,7 @@ public class ClientManagerTest {
Assert.assertFalse(syncClient1.getInputProtocol().getTransport().isOpen());
}
- public void InvalidSyncClientReturnClientManagersTest() throws Exception {
+ public void invalidSyncClientReturnTest() throws Exception {
// init syncClientManager
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> syncClusterManager =
(ClientManager<TEndPoint, SyncDataNodeInternalServiceClient>)
@@ -391,7 +399,7 @@ public class ClientManagerTest {
Assert.assertFalse(syncClient2.getInputProtocol().getTransport().isOpen());
}
- public void InvalidAsyncClientReturnClientManagersTest() throws Exception {
+ public void invalidAsyncClientReturnTest() throws Exception {
// init asyncClientManager
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
(ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
@@ -432,28 +440,51 @@ public class ClientManagerTest {
Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
}
+ public void borrowNullTest() {
+ // init asyncClientManager
+ ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> asyncClusterManager =
+ (ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>)
+ new IClientManager.Factory<TEndPoint, AsyncDataNodeInternalServiceClient>()
+ .createClientManager(new TestAsyncDataNodeInternalServiceClientPoolFactory());
+
+ try {
+ asyncClusterManager.borrowClient(null);
+ Assert.fail();
+ } catch (ClientManagerException e) {
+ Assert.assertTrue(e instanceof BorrowNullClientManagerException);
+ Assert.assertTrue(e.getMessage().contains("Can not borrow client for node null"));
+ }
+
+ // close asyncClientManager, asyncClientManager should destroy all client
+ asyncClusterManager.close();
+ Assert.assertEquals(0, asyncClusterManager.getPool().getNumActive(endPoint));
+ Assert.assertEquals(0, asyncClusterManager.getPool().getNumIdle(endPoint));
+ }
+
public static class TestSyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new SyncDataNodeInternalServiceClient.Factory(
- manager, new ClientFactoryProperty.Builder().build()),
+ manager, new ThriftClientProperty.Builder().build()),
new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>().build().getConfig());
}
}
public static class TestAsyncDataNodeInternalServiceClientPoolFactory
implements IClientPoolFactory<TEndPoint, AsyncDataNodeInternalServiceClient> {
+
@Override
public KeyedObjectPool<TEndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
ClientManager<TEndPoint, AsyncDataNodeInternalServiceClient> manager) {
return new GenericKeyedObjectPool<>(
new AsyncDataNodeInternalServiceClient.Factory(
manager,
- new ClientFactoryProperty.Builder().build(),
- "AsyncDataNodeInternalServiceClientPool"),
+ new ThriftClientProperty.Builder().build(),
+ ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>().build().getConfig());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index a41ef93b2b..1091bcea22 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -38,8 +38,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TCheckUserPrivilegesReq;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.statement.sys.AuthorStatement;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
@@ -66,9 +66,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
private IAuthorizer authorizer;
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
public ClusterAuthorityFetcher(IAuthorCache iAuthorCache) {
this.iAuthorCache = iAuthorCache;
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index ac6f4d0a49..393300c5b7 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
-import org.apache.iotdb.commons.client.BaseClientFactory;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
-import org.apache.iotdb.commons.client.sync.SyncThriftClient;
+import org.apache.iotdb.commons.client.ThriftClient;
+import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
@@ -126,12 +126,12 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
+
+public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClient, AutoCloseable {
-public class ConfigNodeClient
- implements IConfigNodeRPCService.Iface, SyncThriftClient, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConfigNodeClient.class);
private static final int RETRY_NUM = 5;
@@ -167,7 +167,7 @@ public class ConfigNodeClient
// Read config nodes from configuration
configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
protocolFactory =
- CommonDescriptor.getInstance().getConfig().isCnRpcThriftCompressionEnabled()
+ CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
? new TCompactProtocol.Factory()
: new TBinaryProtocol.Factory();
@@ -283,7 +283,7 @@ public class ConfigNodeClient
@Override
public void invalidate() {
- transport.close();
+ Optional.ofNullable(transport).ifPresent(TTransport::close);
}
@Override
@@ -1838,12 +1838,12 @@ public class ConfigNodeClient
throw new TException(MSG_RECONNECTION_FAIL);
}
- public static class Factory extends BaseClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
+ public static class Factory extends ThriftClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
public Factory(
ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
- ClientFactoryProperty clientFactoryProperty) {
- super(clientManager, clientFactoryProperty);
+ ThriftClientProperty thriftClientProperty) {
+ super(clientManager, thriftClientProperty);
}
@Override
@@ -1855,15 +1855,13 @@ public class ConfigNodeClient
@Override
public PooledObject<ConfigNodeClient> makeObject(ConfigNodeRegionId configNodeRegionId)
throws Exception {
- Constructor<ConfigNodeClient> constructor =
- ConfigNodeClient.class.getConstructor(
- TProtocolFactory.class, long.class, clientManager.getClass());
return new DefaultPooledObject<>(
SyncThriftClientWithErrorHandler.newErrorHandler(
ConfigNodeClient.class,
- constructor,
- clientFactoryProperty.getProtocolFactory(),
- clientFactoryProperty.getConnectionTimeoutMs(),
+ ConfigNodeClient.class.getConstructor(
+ TProtocolFactory.class, long.class, clientManager.getClass()),
+ thriftClientProperty.getProtocolFactory(),
+ thriftClientProperty.getConnectionTimeoutMs(),
clientManager));
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
similarity index 52%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
copy to server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
index 439f25b655..12df1907bb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClientManager.java
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
@@ -17,10 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.commons.client.exception;
+package org.apache.iotdb.db.client;
-public class ClientManagerException extends Exception {
- public ClientManagerException(Exception exception) {
- super(exception);
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
+
+public class ConfigNodeClientManager {
+ private static final class ConfigNodeClientManagerHolder {
+ private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient> INSTANCE =
+ new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
+ .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ }
+
+ public static IClientManager<ConfigNodeRegionId, ConfigNodeClient> getInstance() {
+ return ConfigNodeClientManagerHolder.INSTANCE;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
index b5fb12c209..b1451c9337 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/DataNodeClientPoolFactory.java
@@ -19,17 +19,10 @@
package org.apache.iotdb.db.client;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.commons.client.ClientFactoryProperty;
import org.apache.iotdb.commons.client.ClientManager;
-import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientPoolFactory;
-import org.apache.iotdb.commons.client.async.AsyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.async.AsyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
-import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty;
+import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -43,117 +36,18 @@ public class DataNodeClientPoolFactory {
private DataNodeClientPoolFactory() {}
- public static class SyncConfigNodeIServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, SyncConfigNodeIServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, SyncConfigNodeIServiceClient> createClientPool(
- ClientManager<TEndPoint, SyncConfigNodeIServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new SyncConfigNodeIServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
- new ClientPoolProperty.Builder<SyncConfigNodeIServiceClient>().build().getConfig());
- }
- }
-
- public static class AsyncConfigNodeIServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncConfigNodeIServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, AsyncConfigNodeIServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncConfigNodeIServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new AsyncConfigNodeIServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build(),
- ThreadName.ASYNC_CONFIGNODE_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncConfigNodeIServiceClient>()
- .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
- .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
- .build()
- .getConfig());
- }
- }
-
- public static class SyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, SyncDataNodeInternalServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, SyncDataNodeInternalServiceClient> createClientPool(
- ClientManager<TEndPoint, SyncDataNodeInternalServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new SyncDataNodeInternalServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
- new ClientPoolProperty.Builder<SyncDataNodeInternalServiceClient>()
- .setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
- .setMaxTotalClientForEachNode(conf.getMaxConnectionForInternalService())
- .build()
- .getConfig());
- }
- }
-
- public static class SyncDataNodeMPPDataExchangeServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> createClientPool(
- ClientManager<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new SyncDataNodeMPPDataExchangeServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build()),
- new ClientPoolProperty.Builder<SyncDataNodeMPPDataExchangeServiceClient>()
- .build()
- .getConfig());
- }
- }
-
- public static class AsyncDataNodeMPPDataExchangeServiceClientPoolFactory
- implements IClientPoolFactory<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> {
- @Override
- public KeyedObjectPool<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> createClientPool(
- ClientManager<TEndPoint, AsyncDataNodeMPPDataExchangeServiceClient> manager) {
- return new GenericKeyedObjectPool<>(
- new AsyncDataNodeMPPDataExchangeServiceClient.Factory(
- manager,
- new ClientFactoryProperty.Builder()
- .setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
- .setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
- .build(),
- ThreadName.ASYNC_DATANODE_MPP_DATA_EXCHANGE_CLIENT_POOL.getName()),
- new ClientPoolProperty.Builder<AsyncDataNodeMPPDataExchangeServiceClient>()
- .build()
- .getConfig());
- }
- }
-
public static class ConfigNodeClientPoolFactory
implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
@Override
public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
return new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
+ new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
- .setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
.build()),
new ClientPoolProperty.Builder<ConfigNodeClient>()
.setMaxIdleClientForEachNode(conf.getCoreConnectionForInternalService())
@@ -165,13 +59,14 @@ public class DataNodeClientPoolFactory {
public static class ClusterDeletionConfigNodeClientPoolFactory
implements IClientPoolFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
@Override
public KeyedObjectPool<ConfigNodeRegionId, ConfigNodeClient> createClientPool(
ClientManager<ConfigNodeRegionId, ConfigNodeClient> manager) {
return new GenericKeyedObjectPool<>(
new ConfigNodeClient.Factory(
manager,
- new ClientFactoryProperty.Builder()
+ new ThriftClientProperty.Builder()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS() * 10)
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnable())
.setSelectorNumOfAsyncClientManager(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index 19564f2528..88f87b6639 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -32,8 +32,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPathsSetTemplatesResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetTemplateResp;
import org.apache.iotdb.confignode.rpc.thrift.TSetSchemaTemplateReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.CreateSchemaTemplateStatement;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.utils.Pair;
@@ -79,9 +79,7 @@ public class ClusterTemplateManager implements ITemplateManager {
}
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
@Override
public TSStatus createSchemaTemplate(CreateSchemaTemplateStatement statement) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
index f1ee4b6438..fd935ff3c8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeService.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -30,7 +31,6 @@ import org.apache.iotdb.commons.service.ServiceType;
import org.apache.iotdb.commons.service.ThriftService;
import org.apache.iotdb.commons.service.ThriftServiceThread;
import org.apache.iotdb.commons.service.metric.MetricService;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
@@ -69,8 +69,7 @@ public class MPPDataExchangeService extends ThriftService implements MPPDataExch
executorService,
new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory
- .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+ new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
LOGGER.info("MPPDataExchangeManager init successfully");
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
index 3a3002850e..8d18d611c1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/Coordinator.java
@@ -19,11 +19,11 @@
package org.apache.iotdb.db.mpp.plan;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
@@ -54,6 +54,7 @@ import java.util.concurrent.ScheduledExecutorService;
* QueryExecution.
*/
public class Coordinator {
+
private static final Logger LOGGER = LoggerFactory.getLogger(Coordinator.class);
private static final String COORDINATOR_EXECUTOR_NAME = "MPPCoordinator";
@@ -69,7 +70,7 @@ public class Coordinator {
INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
@@ -213,6 +214,11 @@ public class Coordinator {
}
}
+ public IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
+ getInternalServiceClientManager() {
+ return INTERNAL_SERVICE_CLIENT_MANAGER;
+ }
+
public static Coordinator getInstance() {
return INSTANCE;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 872e1eb66d..a0569e390a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -33,7 +34,6 @@ import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerReq;
import org.apache.iotdb.consensus.iot.thrift.TInactivatePeerRes;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadReq;
import org.apache.iotdb.consensus.iot.thrift.TTriggerSnapshotLoadRes;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
@@ -45,7 +45,7 @@ public class TestRPCClient {
INTERNAL_SERVICE_CLIENT_MANAGER =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager;
@@ -66,7 +66,7 @@ public class TestRPCClient {
private void loadSnapshot() {
try (SyncIoTConsensusServiceClient client =
- syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10761))) {
+ syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40011))) {
TTriggerSnapshotLoadRes res =
client.triggerSnapshotLoad(
new TTriggerSnapshotLoadReq(
@@ -79,7 +79,7 @@ public class TestRPCClient {
private void testAddPeer() {
try (SyncIoTConsensusServiceClient client =
- syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 10762))) {
+ syncClientManager.borrowClient(new TEndPoint("127.0.0.1", 40012))) {
TInactivatePeerRes res =
client.inactivatePeer(
new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
@@ -91,7 +91,7 @@ public class TestRPCClient {
private void removeRegionPeer() {
try (SyncDataNodeInternalServiceClient client =
- INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
client.removeRegionPeer(
new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
} catch (Exception e) {
@@ -101,7 +101,7 @@ public class TestRPCClient {
private void addPeer() {
try (SyncDataNodeInternalServiceClient client =
- INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10730))) {
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
client.addRegionPeer(
new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
} catch (Exception e) {
@@ -131,7 +131,7 @@ public class TestRPCClient {
private void createDataRegion() {
try (SyncDataNodeInternalServiceClient client =
- INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 10732))) {
+ INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9005))) {
TCreateDataRegionReq req = new TCreateDataRegionReq();
req.setStorageGroup("root.test.g_0");
TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 2c70aaf3d0..e3e937d04d 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -40,8 +40,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -72,8 +72,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
private final PartitionCache partitionCache;
private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ ConfigNodeClientManager.getInstance();
private static final class ClusterPartitionFetcherHolder {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 040c568b40..249be0db20 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -42,8 +42,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.sql.StatementAnalyzeException;
@@ -106,8 +106,7 @@ public class PartitionCache {
private final ReentrantReadWriteLock regionReplicaSetLock = new ReentrantReadWriteLock();
private final IClientManager<ConfigNodeRegionId, ConfigNodeClient> configNodeClientManager =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ ConfigNodeClientManager.getInstance();
public PartitionCache() {
this.schemaPartitionCache = Caffeine.newBuilder().maximumSize(cacheSize).build();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 11f6ad0494..a3753360c9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.confignode.rpc.thrift.TUnsetSchemaTemplateReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
@@ -176,10 +177,9 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterConfigTaskExecutor.class);
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
+ /** FIXME Consolidate this clientManager with the upper one. */
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER =
new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
@@ -187,6 +187,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
new DataNodeClientPoolFactory.ClusterDeletionConfigNodeClientPoolFactory());
private static final class ClusterConfigTaskExecutorHolder {
+
private static final ClusterConfigTaskExecutor INSTANCE = new ClusterConfigTaskExecutor();
private ClusterConfigTaskExecutorHolder() {}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
index e596c89ede..10d4376d2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/common/ClusterSyncInfoFetcher.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipeSinkResp;
import org.apache.iotdb.confignode.rpc.thrift.TRecordPipeMessageReq;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.plan.statement.sys.sync.CreatePipeSinkStatement;
import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
import org.apache.iotdb.rpc.RpcUtils;
@@ -50,9 +50,7 @@ public class ClusterSyncInfoFetcher implements ISyncInfoFetcher {
private static final Logger LOGGER = LoggerFactory.getLogger(ClusterSyncInfoFetcher.class);
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
// region Interfaces of PipeSink
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index 8d5eb5cdcf..af2f1e5179 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.trigger.executor;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
@@ -31,9 +30,10 @@ import org.apache.iotdb.commons.trigger.TriggerTable;
import org.apache.iotdb.commons.trigger.exception.TriggerExecutionException;
import org.apache.iotdb.confignode.rpc.thrift.TTriggerState;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
@@ -68,16 +68,8 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerFireVisitor.class);
- private static final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
- INTERNAL_SERVICE_CLIENT_MANAGER =
- new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
-
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
/**
* How many times should we retry when error occurred during firing a trigger on another datanode
@@ -328,7 +320,9 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
TriggerManagementService.getInstance()
.getDataNodeLocationOfStatefulTrigger(triggerName);
try (SyncDataNodeInternalServiceClient client =
- INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(tDataNodeLocation.getInternalEndPoint())) {
+ Coordinator.getInstance()
+ .getInternalServiceClientManager()
+ .borrowClient(tDataNodeLocation.getInternalEndPoint())) {
TFireTriggerReq req = new TFireTriggerReq(triggerName, tablet.serialize(), event.getId());
TFireTriggerResp resp = client.fireTrigger(req);
if (resp.foundExecutor) {
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
index 0aadf06daa..f4b015074d 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/service/TriggerInformationUpdater.java
@@ -27,8 +27,8 @@ import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -45,9 +45,7 @@ public class TriggerInformationUpdater {
private static final Logger LOGGER = LoggerFactory.getLogger(TriggerInformationUpdater.class);
private static final IClientManager<ConfigNodeRegionId, ConfigNodeClient>
- CONFIG_NODE_CLIENT_MANAGER =
- new IClientManager.Factory<ConfigNodeRegionId, ConfigNodeClient>()
- .createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
+ CONFIG_NODE_CLIENT_MANAGER = ConfigNodeClientManager.getInstance();
private final ScheduledExecutorService triggerInformationUpdateExecutor =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
index bc0c5d5207..add9d54009 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/MPPDataExchangeManagerTest.java
@@ -20,9 +20,9 @@
package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.db.mpp.execution.memory.MemoryPool;
@@ -55,8 +55,7 @@ public class MPPDataExchangeManagerTest {
Executors.newSingleThreadExecutor(),
new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory
- .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+ new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
ISinkHandle localSinkHandle =
mppDataExchangeManager.createLocalSinkHandle(
@@ -98,8 +97,7 @@ public class MPPDataExchangeManagerTest {
Executors.newSingleThreadExecutor(),
new IClientManager.Factory<TEndPoint, SyncDataNodeMPPDataExchangeServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory
- .SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
+ new ClientPoolFactory.SyncDataNodeMPPDataExchangeServiceClientPoolFactory()));
ISourceHandle localSourceHandle =
mppDataExchangeManager.createLocalSourceHandle(
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
index de492a843b..5d0c47d011 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryPlannerTest.java
@@ -20,10 +20,10 @@
package org.apache.iotdb.db.mpp.plan.plan;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
@@ -52,7 +52,7 @@ public class QueryPlannerTest {
internalServiceClientManager =
new IClientManager.Factory<TEndPoint, SyncDataNodeInternalServiceClient>()
.createClientManager(
- new DataNodeClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
+ new ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
}
@AfterClass