You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/04/19 09:43:31 UTC
[iotdb] 03/03: make interface more generic
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch client_manager
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0953f6a72ad54349fd3aced1b4e492c18d31743d
Author: LebronAl <TX...@gmail.com>
AuthorDate: Tue Apr 19 17:42:28 2022 +0800
make interface more generic
---
.../iotdb/confignode/client/ClientPoolFactory.java | 8 +--
.../iotdb/consensus/ratis/RatisClientFactory.java | 67 ++++++++++++++++++++++
.../iotdb/consensus/ratis/RatisConsensus.java | 56 +++++++++++++-----
node-commons/pom.xml | 4 ++
.../commons/client/AsyncBaseClientFactory.java | 4 +-
.../iotdb/commons/client/BaseClientFactory.java | 12 ++--
.../apache/iotdb/commons/client/ClientManager.java | 19 +++---
.../commons/client/ClientManagerProperty.java | 26 ++++-----
.../iotdb/commons/client/IClientManager.java | 11 ++--
.../iotdb/commons/client/IClientPoolFactory.java | 7 +--
.../async/AsyncDataNodeInternalServiceClient.java | 6 +-
.../sync/SyncDataNodeInternalServiceClient.java | 6 +-
.../apache/iotdb/db/client/ClientPoolFactory.java | 16 +++---
.../async/AsyncConfigNodeIServiceClient.java | 6 +-
.../async/AsyncDataNodeDataBlockServiceClient.java | 6 +-
.../client/sync/SyncConfigNodeIServiceClient.java | 6 +-
.../sync/SyncDataNodeDataBlockServiceClient.java | 6 +-
17 files changed, 177 insertions(+), 89 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
index 73bb856065..d8731cb952 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/ClientPoolFactory.java
@@ -36,10 +36,10 @@ public class ClientPoolFactory {
private static final ConfigNodeConf conf = ConfigNodeDescriptor.getInstance().getConf();
public static class SyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+ implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
@Override
public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
- ClientManager<SyncDataNodeInternalServiceClient> manager) {
+ ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -51,10 +51,10 @@ public class ClientPoolFactory {
}
public static class AsyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+ implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
@Override
public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
- ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+ ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
new file mode 100644
index 0000000000..27d5f7a640
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisClientFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.commons.client.BaseClientFactory;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroup;
+
+public class RatisClientFactory extends BaseClientFactory<RaftGroup, RaftClient> {
+
+ private final RaftProperties raftProperties;
+ private final RaftClientRpc clientRpc;
+
+ public RatisClientFactory(
+ ClientManager<RaftGroup, RaftClient> clientManager,
+ ClientManagerProperty<RaftClient> clientManagerProperty,
+ RaftProperties raftProperties,
+ RaftClientRpc clientRpc) {
+ super(clientManager, clientManagerProperty);
+ this.raftProperties = raftProperties;
+ this.clientRpc = clientRpc;
+ }
+
+ @Override
+ public void destroyObject(RaftGroup key, PooledObject<RaftClient> pooledObject) throws Exception {
+ pooledObject.getObject().close();
+ }
+
+ @Override
+ public PooledObject<RaftClient> makeObject(RaftGroup group) throws Exception {
+ return new DefaultPooledObject<>(
+ RaftClient.newBuilder()
+ .setProperties(raftProperties)
+ .setRaftGroup(group)
+ .setClientRpc(clientRpc)
+ .build());
+ }
+
+ @Override
+ public boolean validateObject(RaftGroup key, PooledObject<RaftClient> pooledObject) {
+ return true;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index a87cafd2fe..cbf3f713f6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.consensus.ratis;
import org.apache.iotdb.common.rpc.thrift.EndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientManager;
+import org.apache.iotdb.commons.client.ClientManagerProperty;
+import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.cluster.Endpoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.IConsensus;
@@ -37,7 +40,10 @@ import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
@@ -74,41 +80,40 @@ import java.util.stream.Collectors;
* <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
*/
class RatisConsensus implements IConsensus {
+
+ private final Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
+
// the unique net communication endpoint
private final RaftPeer myself;
-
private final RaftServer server;
- private final Map<RaftGroupId, RaftClient> clientMap;
- private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+ private final RaftProperties properties = new RaftProperties();;
+ private final RaftClientRpc clientRpc;
+
+ private final ClientManager<RaftGroup, RaftClient> clientManager =
+ new ClientManager<>(new RatisClientPoolFactory());
+ private final Map<RaftGroupId, RaftGroup> raftGroupMap = new ConcurrentHashMap<>();
+ private final Map<RaftGroupId, RaftClient> clientMap = new ConcurrentHashMap<>();
- private ClientId localFakeId;
- private AtomicLong localFakeCallId;
+ private final ClientId localFakeId = ClientId.randomId();
+ private final AtomicLong localFakeCallId = new AtomicLong(0);
private static final int DEFAULT_PRIORITY = 0;
private static final int LEADER_PRIORITY = 1;
- private Logger logger = LoggerFactory.getLogger(RatisConsensus.class);
-
public RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
- this.clientMap = new ConcurrentHashMap<>();
- this.raftGroupMap = new ConcurrentHashMap<>();
- this.localFakeId = ClientId.randomId();
- this.localFakeCallId = new AtomicLong(0);
-
// create a RaftPeer as endpoint of comm
String address = Utils.IPAddress(endpoint);
myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
- RaftProperties properties = new RaftProperties();
-
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
// set the port which server listen to in RaftProperty object
final int port = NetUtils.createSocketAddr(address).getPort();
GrpcConfigKeys.Server.setPort(properties, port);
+ clientRpc = new GrpcFactory(new Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
server =
RaftServer.newBuilder()
@@ -128,6 +133,7 @@ class RatisConsensus implements IConsensus {
@Override
public void stop() throws IOException {
+ clientManager.close();
server.close();
}
@@ -443,7 +449,7 @@ class RatisConsensus implements IConsensus {
isLeader = server.getDivision(raftGroupId).getInfo().isLeader();
} catch (IOException exception) {
// if the query fails, simply return not leader
- logger.warn("isLeader request failed with exception: ", exception);
+ logger.info("isLeader request failed with exception: ", exception);
isLeader = false;
}
return isLeader;
@@ -516,6 +522,15 @@ class RatisConsensus implements IConsensus {
return client;
}
+ private RaftClient getRaftClient(RaftGroup group) {
+ try {
+ return clientManager.borrowClient(group);
+ } catch (IOException e) {
+ logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
+ return null;
+ }
+ }
+
private void closeRaftClient(RaftGroupId groupId) {
RaftClient client = clientMap.get(groupId);
if (client != null) {
@@ -563,4 +578,15 @@ class RatisConsensus implements IConsensus {
}
return reply;
}
+
+ private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RaftClient> {
+ @Override
+ public KeyedObjectPool<RaftGroup, RaftClient> createClientPool(
+ ClientManager<RaftGroup, RaftClient> manager) {
+ ClientManagerProperty<RaftClient> property =
+ new ClientManagerProperty.Builder<RaftClient>().build();
+ return new GenericKeyedObjectPool<>(
+ new RatisClientFactory(manager, property, properties, clientRpc), property.getConfig());
+ }
+ }
}
diff --git a/node-commons/pom.xml b/node-commons/pom.xml
index f4e3587e38..a88c22bbf3 100644
--- a/node-commons/pom.xml
+++ b/node-commons/pom.xml
@@ -53,6 +53,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
index b710f5482c..b822b0043b 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/AsyncBaseClientFactory.java
@@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
-public abstract class AsyncBaseClientFactory<K, T> extends BaseClientFactory<K, T> {
+public abstract class AsyncBaseClientFactory<K, V> extends BaseClientFactory<K, V> {
private static final Logger logger = LoggerFactory.getLogger(AsyncBaseClientFactory.class);
protected TAsyncClientManager[] tManagers;
protected AtomicInteger clientCnt = new AtomicInteger();
protected AsyncBaseClientFactory(
- ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+ ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
super(clientManager, clientManagerProperty);
tManagers = new TAsyncClientManager[clientManagerProperty.getSelectorNumOfAsyncClientPool()];
for (int i = 0; i < tManagers.length; i++) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
index f628692152..1f85805691 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/BaseClientFactory.java
@@ -22,20 +22,20 @@ package org.apache.iotdb.commons.client;
import org.apache.commons.pool2.KeyedPooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
-public abstract class BaseClientFactory<K, T> implements KeyedPooledObjectFactory<K, T> {
+public abstract class BaseClientFactory<K, V> implements KeyedPooledObjectFactory<K, V> {
- protected ClientManager<T> clientManager;
- protected ClientManagerProperty<T> clientManagerProperty;
+ protected ClientManager<K, V> clientManager;
+ protected ClientManagerProperty<V> clientManagerProperty;
public BaseClientFactory(
- ClientManager<T> clientManager, ClientManagerProperty<T> clientManagerProperty) {
+ ClientManager<K, V> clientManager, ClientManagerProperty<V> clientManagerProperty) {
this.clientManager = clientManager;
this.clientManagerProperty = clientManagerProperty;
}
@Override
- public void activateObject(K node, PooledObject<T> pooledObject) {}
+ public void activateObject(K node, PooledObject<V> pooledObject) {}
@Override
- public void passivateObject(K node, PooledObject<T> pooledObject) {}
+ public void passivateObject(K node, PooledObject<V> pooledObject) {}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 0513af1a5d..de3b8947cb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,31 +19,28 @@
package org.apache.iotdb.commons.client;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
import org.apache.commons.pool2.KeyedObjectPool;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Optional;
-public class ClientManager<E> implements IClientManager<E> {
+public class ClientManager<K, V> implements IClientManager<K, V> {
private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
- private final KeyedObjectPool<EndPoint, E> pool;
+ private final KeyedObjectPool<K, V> pool;
- public ClientManager(IClientPoolFactory<E> factory) {
+ public ClientManager(IClientPoolFactory<K, V> factory) {
pool = factory.createClientPool(this);
}
@Override
- public Optional<E> borrowClient(EndPoint node) throws IOException {
- Optional<E> client = Optional.empty();
+ public V borrowClient(K node) throws IOException {
+ V client = null;
try {
- client = Optional.of(pool.borrowObject(node));
+ client = pool.borrowObject(node);
} catch (TTransportException e) {
// external needs to check transport related exception
throw new IOException(e);
@@ -58,7 +55,7 @@ public class ClientManager<E> implements IClientManager<E> {
}
@Override
- public void returnClient(EndPoint node, E client) {
+ public void returnClient(K node, V client) {
if (client != null && node != null) {
try {
pool.returnObject(node, client);
@@ -70,7 +67,7 @@ public class ClientManager<E> implements IClientManager<E> {
}
@Override
- public void clear(EndPoint node) {
+ public void clear(K node) {
if (node != null) {
try {
pool.clear(node);
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
index b35f163257..5fc95174db 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManagerProperty.java
@@ -26,9 +26,9 @@ import org.apache.thrift.protocol.TProtocolFactory;
import java.time.Duration;
-public class ClientManagerProperty<T> {
+public class ClientManagerProperty<V> {
- private final GenericKeyedObjectPoolConfig<T> config;
+ private final GenericKeyedObjectPoolConfig<V> config;
// thrift client config
private final TProtocolFactory protocolFactory;
@@ -36,7 +36,7 @@ public class ClientManagerProperty<T> {
private int selectorNumOfAsyncClientPool = 1;
public ClientManagerProperty(
- GenericKeyedObjectPoolConfig<T> config,
+ GenericKeyedObjectPoolConfig<V> config,
TProtocolFactory protocolFactory,
int connectionTimeoutMs,
int selectorNumOfAsyncClientPool) {
@@ -46,7 +46,7 @@ public class ClientManagerProperty<T> {
this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
}
- public GenericKeyedObjectPoolConfig<T> getConfig() {
+ public GenericKeyedObjectPoolConfig<V> getConfig() {
return config;
}
@@ -62,7 +62,7 @@ public class ClientManagerProperty<T> {
return selectorNumOfAsyncClientPool;
}
- public static class Builder<T> {
+ public static class Builder<V> {
// pool config
private long waitClientTimeoutMS = 20_000;
@@ -74,38 +74,38 @@ public class ClientManagerProperty<T> {
private int connectionTimeoutMs = 20_000;
private int selectorNumOfAsyncClientPool = 1;
- public Builder<T> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
+ public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
this.waitClientTimeoutMS = waitClientTimeoutMS;
return this;
}
- public Builder<T> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
+ public Builder<V> setMaxConnectionForEachNode(int maxConnectionForEachNode) {
this.maxConnectionForEachNode = maxConnectionForEachNode;
return this;
}
- public Builder<T> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
+ public Builder<V> setMaxIdleConnectionForEachNode(int maxIdleConnectionForEachNode) {
this.maxIdleConnectionForEachNode = maxIdleConnectionForEachNode;
return this;
}
- public Builder<T> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
+ public Builder<V> setRpcThriftCompressionEnabled(boolean rpcThriftCompressionEnabled) {
this.rpcThriftCompressionEnabled = rpcThriftCompressionEnabled;
return this;
}
- public Builder<T> setConnectionTimeoutMs(int connectionTimeoutMs) {
+ public Builder<V> setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
return this;
}
- public Builder<T> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
+ public Builder<V> setSelectorNumOfAsyncClientPool(int selectorNumOfAsyncClientPool) {
this.selectorNumOfAsyncClientPool = selectorNumOfAsyncClientPool;
return this;
}
- public ClientManagerProperty<T> build() {
- GenericKeyedObjectPoolConfig<T> poolConfig = new GenericKeyedObjectPoolConfig<>();
+ public ClientManagerProperty<V> build() {
+ GenericKeyedObjectPoolConfig<V> poolConfig = new GenericKeyedObjectPoolConfig<>();
poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
poolConfig.setMaxIdlePerKey(maxIdleConnectionForEachNode);
poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 00ee3d45a5..529c78e8b1 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -19,18 +19,15 @@
package org.apache.iotdb.commons.client;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
import java.io.IOException;
-import java.util.Optional;
-public interface IClientManager<E> {
+public interface IClientManager<K, V> {
- Optional<E> borrowClient(EndPoint node) throws IOException;
+ V borrowClient(K node) throws IOException;
- void returnClient(EndPoint node, E client);
+ void returnClient(K node, V client);
- void clear(EndPoint node);
+ void clear(K node);
void close();
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 80341bf5f5..aa7980decb 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -19,12 +19,9 @@
package org.apache.iotdb.commons.client;
-import org.apache.iotdb.common.rpc.thrift.EndPoint;
-
import org.apache.commons.pool2.KeyedObjectPool;
-public interface IClientPoolFactory<E> {
+public interface IClientPoolFactory<K, V> {
- KeyedObjectPool<EndPoint, E> createClientPool(
- ClientManager<E> manager);
+ KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
index 6119b799a8..8547d13890 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/async/AsyncDataNodeInternalServiceClient.java
@@ -37,14 +37,14 @@ import java.io.IOException;
public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncClient {
private final EndPoint endpoint;
- private final ClientManager<AsyncDataNodeInternalServiceClient> clientManager;
+ private final ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager;
public AsyncDataNodeInternalServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
TAsyncClientManager tClientManager,
- ClientManager<AsyncDataNodeInternalServiceClient> clientManager)
+ ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager)
throws IOException {
super(
protocolFactory,
@@ -92,7 +92,7 @@ public class AsyncDataNodeInternalServiceClient extends InternalService.AsyncCli
extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
public Factory(
- ClientManager<AsyncDataNodeInternalServiceClient> clientManager,
+ ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> clientManager,
ClientManagerProperty<AsyncDataNodeInternalServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
index a40104bbbc..c273809554 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/sync/SyncDataNodeInternalServiceClient.java
@@ -40,13 +40,13 @@ import java.net.SocketException;
public class SyncDataNodeInternalServiceClient extends InternalService.Client {
private final EndPoint endpoint;
- private final ClientManager<SyncDataNodeInternalServiceClient> clientManager;
+ private final ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager;
public SyncDataNodeInternalServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
- ClientManager<SyncDataNodeInternalServiceClient> clientManager)
+ ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager)
throws TTransportException {
super(
protocolFactory.getProtocol(
@@ -84,7 +84,7 @@ public class SyncDataNodeInternalServiceClient extends InternalService.Client {
extends BaseClientFactory<EndPoint, SyncDataNodeInternalServiceClient> {
public Factory(
- ClientManager<SyncDataNodeInternalServiceClient> clientManager,
+ ClientManager<EndPoint, SyncDataNodeInternalServiceClient> clientManager,
ClientManagerProperty<SyncDataNodeInternalServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
index 63f6c7a45c..e24e5d4e94 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ClientPoolFactory.java
@@ -38,10 +38,10 @@ public class ClientPoolFactory {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
public static class SyncConfigNodeIServiceClientPoolFactory
- implements IClientPoolFactory<SyncConfigNodeIServiceClient> {
+ implements IClientPoolFactory<EndPoint, SyncConfigNodeIServiceClient> {
@Override
public KeyedObjectPool<EndPoint, SyncConfigNodeIServiceClient> createClientPool(
- ClientManager<SyncConfigNodeIServiceClient> manager) {
+ ClientManager<EndPoint, SyncConfigNodeIServiceClient> manager) {
ClientManagerProperty<SyncConfigNodeIServiceClient> property =
new ClientManagerProperty.Builder<SyncConfigNodeIServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -53,10 +53,10 @@ public class ClientPoolFactory {
}
public static class AsyncConfigNodeIServiceClientPoolFactory
- implements IClientPoolFactory<AsyncConfigNodeIServiceClient> {
+ implements IClientPoolFactory<EndPoint, AsyncConfigNodeIServiceClient> {
@Override
public KeyedObjectPool<EndPoint, AsyncConfigNodeIServiceClient> createClientPool(
- ClientManager<AsyncConfigNodeIServiceClient> manager) {
+ ClientManager<EndPoint, AsyncConfigNodeIServiceClient> manager) {
ClientManagerProperty<AsyncConfigNodeIServiceClient> property =
new ClientManagerProperty.Builder<AsyncConfigNodeIServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -69,10 +69,10 @@ public class ClientPoolFactory {
}
public static class SyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<SyncDataNodeInternalServiceClient> {
+ implements IClientPoolFactory<EndPoint, SyncDataNodeInternalServiceClient> {
@Override
public KeyedObjectPool<EndPoint, SyncDataNodeInternalServiceClient> createClientPool(
- ClientManager<SyncDataNodeInternalServiceClient> manager) {
+ ClientManager<EndPoint, SyncDataNodeInternalServiceClient> manager) {
ClientManagerProperty<SyncDataNodeInternalServiceClient> property =
new ClientManagerProperty.Builder<SyncDataNodeInternalServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
@@ -84,10 +84,10 @@ public class ClientPoolFactory {
}
public static class AsyncDataNodeInternalServiceClientPoolFactory
- implements IClientPoolFactory<AsyncDataNodeInternalServiceClient> {
+ implements IClientPoolFactory<EndPoint, AsyncDataNodeInternalServiceClient> {
@Override
public KeyedObjectPool<EndPoint, AsyncDataNodeInternalServiceClient> createClientPool(
- ClientManager<AsyncDataNodeInternalServiceClient> manager) {
+ ClientManager<EndPoint, AsyncDataNodeInternalServiceClient> manager) {
ClientManagerProperty<AsyncDataNodeInternalServiceClient> property =
new ClientManagerProperty.Builder<AsyncDataNodeInternalServiceClient>()
.setConnectionTimeoutMs(conf.getConnectionTimeoutInMS())
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
index 00ac180429..48bb173b83 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncConfigNodeIServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
private final EndPoint endpoint;
- private final ClientManager<AsyncConfigNodeIServiceClient> clientManager;
+ private final ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager;
public AsyncConfigNodeIServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
TAsyncClientManager tClientManager,
- ClientManager<AsyncConfigNodeIServiceClient> clientManager)
+ ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager)
throws IOException {
super(
protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncConfigNodeIServiceClient extends ConfigIService.AsyncClient {
extends AsyncBaseClientFactory<EndPoint, AsyncConfigNodeIServiceClient> {
public Factory(
- ClientManager<AsyncConfigNodeIServiceClient> clientManager,
+ ClientManager<EndPoint, AsyncConfigNodeIServiceClient> clientManager,
ClientManagerProperty<AsyncConfigNodeIServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
index 1701083464..5b4e81e775 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/async/AsyncDataNodeDataBlockServiceClient.java
@@ -36,14 +36,14 @@ import java.io.IOException;
public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncClient {
private final EndPoint endpoint;
- private final ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager;
+ private final ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager;
public AsyncDataNodeDataBlockServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
TAsyncClientManager tClientManager,
- ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager)
+ ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager)
throws IOException {
super(
protocolFactory,
@@ -91,7 +91,7 @@ public class AsyncDataNodeDataBlockServiceClient extends DataBlockService.AsyncC
extends AsyncBaseClientFactory<EndPoint, AsyncDataNodeDataBlockServiceClient> {
public Factory(
- ClientManager<AsyncDataNodeDataBlockServiceClient> clientManager,
+ ClientManager<EndPoint, AsyncDataNodeDataBlockServiceClient> clientManager,
ClientManagerProperty<AsyncDataNodeDataBlockServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
index 60e01bfc0c..ab9121f8de 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncConfigNodeIServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
private final EndPoint endpoint;
- private final ClientManager<SyncConfigNodeIServiceClient> clientManager;
+ private final ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager;
public SyncConfigNodeIServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
- ClientManager<SyncConfigNodeIServiceClient> clientManager)
+ ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager)
throws TTransportException {
super(
protocolFactory.getProtocol(
@@ -82,7 +82,7 @@ public class SyncConfigNodeIServiceClient extends ConfigIService.Client {
public static class Factory extends BaseClientFactory<EndPoint, SyncConfigNodeIServiceClient> {
public Factory(
- ClientManager<SyncConfigNodeIServiceClient> clientManager,
+ ClientManager<EndPoint, SyncConfigNodeIServiceClient> clientManager,
ClientManagerProperty<SyncConfigNodeIServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
index f6188709a9..7f598e6159 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/sync/SyncDataNodeDataBlockServiceClient.java
@@ -39,13 +39,13 @@ import java.net.SocketException;
public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client {
private final EndPoint endpoint;
- private final ClientManager<SyncDataNodeDataBlockServiceClient> clientManager;
+ private final ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager;
public SyncDataNodeDataBlockServiceClient(
TProtocolFactory protocolFactory,
int connectionTimeout,
EndPoint endpoint,
- ClientManager<SyncDataNodeDataBlockServiceClient> clientManager)
+ ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager)
throws TTransportException {
super(
protocolFactory.getProtocol(
@@ -83,7 +83,7 @@ public class SyncDataNodeDataBlockServiceClient extends DataBlockService.Client
extends BaseClientFactory<EndPoint, SyncDataNodeDataBlockServiceClient> {
public Factory(
- ClientManager<SyncDataNodeDataBlockServiceClient> clientManager,
+ ClientManager<EndPoint, SyncDataNodeDataBlockServiceClient> clientManager,
ClientManagerProperty<SyncDataNodeDataBlockServiceClient> clientManagerProperty) {
super(clientManager, clientManagerProperty);
}