You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/09/30 07:40:48 UTC
[iotdb] branch cluster- updated: [cluster-refactor] refactor client
pool (#3886)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster- by this push:
new 19d2f70 [cluster-refactor] refactor client pool (#3886)
19d2f70 is described below
commit 19d2f70fbdbf9119383e9e36cbf61d7f6e64d6cd
Author: Jianyun Cheng <ch...@outlook.com>
AuthorDate: Thu Sep 30 15:40:20 2021 +0800
[cluster-refactor] refactor client pool (#3886)
* [cluster-refactor] refactor client pool
* [cluster-refator] refactor client pool: resolve comments
* [cluster-refactor] refactor client pool: enhance test case and resolve comments
* [cluster-refactor] refactor client pool: resolve comments-3
* [cluster-refactor] add license for new add files
* [cluster-refactor] update start class name in shell/bat script and apply
spotless
* [cluster-refactor] refactor client pool: double check all client should be return after use and resolve comments
* [cluster-refactor] refactor client pool: resolve comments
* [cluster refactor] refactor client pool: rename ClusterClient and add comments
* [cluster refactor] refactor client pool: apply spotless
* [cluster-refactor] refactor client pool: rename single master to data async append client
* apply spotless
Co-authored-by: chengjianyun <ch...@360.cn>
---
cluster/pom.xml | 4 +
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 48 ++--
.../apache/iotdb/cluster/client/BaseFactory.java | 58 +++++
.../SyncClientFactory.java => ClientCategory.java} | 32 ++-
.../apache/iotdb/cluster/client/ClientManager.java | 187 +++++++++++++++
.../iotdb/cluster/client/ClientPoolFactory.java | 98 ++++++++
.../iotdb/cluster/client/DataClientProvider.java | 98 --------
.../SyncClientFactory.java => IClientManager.java} | 23 +-
.../cluster/client/async/AsyncBaseFactory.java | 68 ++++++
.../cluster/client/async/AsyncClientFactory.java | 65 ------
.../cluster/client/async/AsyncClientPool.java | 220 -----------------
.../cluster/client/async/AsyncDataClient.java | 201 ++++++++++------
.../client/async/AsyncDataHeartbeatClient.java | 81 -------
.../cluster/client/async/AsyncMetaClient.java | 148 ++++++++----
.../client/async/AsyncMetaHeartbeatClient.java | 81 -------
.../iotdb/cluster/client/sync/SyncClientPool.java | 194 ---------------
.../iotdb/cluster/client/sync/SyncDataClient.java | 120 ++++++++--
.../client/sync/SyncDataHeartbeatClient.java | 80 -------
.../iotdb/cluster/client/sync/SyncMetaClient.java | 120 +++++++---
.../client/sync/SyncMetaHeartbeatClient.java | 77 ------
.../cluster/client/sync/TSDataServiceClient.java | 111 ---------
.../cluster/client/sync/TSMetaServiceClient.java | 109 ---------
.../iotdb/cluster/coordinator/Coordinator.java | 42 +---
.../iotdb/cluster/log/snapshot/FileSnapshot.java | 12 +-
.../cluster/log/snapshot/PullSnapshotTask.java | 5 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 149 ++++++------
.../apache/iotdb/cluster/metadata/MetaPuller.java | 75 +++---
.../iotdb/cluster/query/ClusterPlanExecutor.java | 126 +++++-----
.../cluster/query/aggregate/ClusterAggregator.java | 33 ++-
.../cluster/query/fill/ClusterPreviousFill.java | 39 ++--
.../query/groupby/RemoteGroupByExecutor.java | 60 ++---
.../query/last/ClusterLastQueryExecutor.java | 38 +--
.../cluster/query/reader/ClusterReaderFactory.java | 31 ++-
.../iotdb/cluster/query/reader/DataSourceInfo.java | 68 +++---
.../reader/RemoteSeriesReaderByTimestamp.java | 9 +-
.../query/reader/RemoteSimpleSeriesReader.java | 9 +-
.../query/reader/mult/MultDataSourceInfo.java | 47 ++--
.../query/reader/mult/RemoteMultSeriesReader.java | 22 +-
.../iotdb/cluster/server/ClusterTSServiceImpl.java | 31 +--
.../cluster/server/PullSnapshotHintService.java | 12 +-
.../cluster/server/heartbeat/HeartbeatThread.java | 5 +-
.../cluster/server/member/DataGroupMember.java | 22 +-
.../cluster/server/member/MetaGroupMember.java | 82 ++++---
.../iotdb/cluster/server/member/RaftMember.java | 161 ++++++-------
.../apache/iotdb/cluster/utils/ClientUtils.java | 64 +++--
.../iotdb/cluster/client/BaseClientTest.java | 156 +++++++++++++
.../iotdb/cluster/client/ClientManagerTest.java | 209 +++++++++++++++++
.../cluster/client/ClientPoolFactoryTest.java | 260 +++++++++++++++++++++
.../cluster/client/DataClientProviderTest.java | 242 -------------------
.../iotdb/cluster/client/MockClientManager.java} | 33 +--
.../cluster/client/async/AsyncClientPoolTest.java | 208 -----------------
.../cluster/client/async/AsyncDataClientTest.java | 110 ++++-----
.../client/async/AsyncDataHeartbeatClientTest.java | 60 -----
.../cluster/client/async/AsyncMetaClientTest.java | 106 ++++-----
.../client/async/AsyncMetaHeartbeatClientTest.java | 61 -----
.../cluster/client/sync/SyncClientPoolTest.java | 167 -------------
.../cluster/client/sync/SyncDataClientTest.java | 177 +++++++-------
.../client/sync/SyncDataHeartbeatClientTest.java | 64 -----
.../cluster/client/sync/SyncMetaClientTest.java | 171 +++++++-------
.../client/sync/SyncMetaHeartbeatClientTest.java | 64 -----
.../cluster/common/TestAsyncClientFactory.java | 55 -----
.../iotdb/cluster/common/TestAsyncMetaClient.java | 9 +-
.../cluster/common/TestSyncClientFactory.java | 88 -------
.../cluster/log/applier/DataLogApplierTest.java | 177 +++++++-------
.../iotdb/cluster/log/catchup/CatchUpTaskTest.java | 5 -
.../cluster/log/catchup/LogCatchUpTaskTest.java | 5 -
.../log/catchup/SnapshotCatchUpTaskTest.java | 5 -
.../cluster/log/snapshot/DataSnapshotTest.java | 5 -
.../cluster/log/snapshot/PullSnapshotTaskTest.java | 5 -
.../cluster/query/manage/QueryCoordinatorTest.java | 8 +-
.../cluster/query/reader/DatasourceInfoTest.java | 25 +-
.../reader/RemoteSeriesReaderByTimestampTest.java | 36 +--
.../query/reader/RemoteSimpleSeriesReaderTest.java | 27 ++-
.../mult/AssignPathManagedMergeReaderTest.java | 25 +-
.../reader/mult/RemoteMultSeriesReaderTest.java | 45 +++-
.../server/heartbeat/DataHeartbeatThreadTest.java | 5 -
.../server/heartbeat/HeartbeatThreadTest.java | 5 -
.../server/heartbeat/MetaHeartbeatThreadTest.java | 5 -
.../iotdb/cluster/server/member/BaseMember.java | 28 +--
.../cluster/server/member/DataGroupMemberTest.java | 5 -
.../cluster/server/member/MetaGroupMemberTest.java | 25 +-
pom.xml | 6 +
82 files changed, 2595 insertions(+), 3457 deletions(-)
diff --git a/cluster/pom.xml b/cluster/pom.xml
index ab1cee5..42f690d 100644
--- a/cluster/pom.xml
+++ b/cluster/pom.xml
@@ -131,6 +131,10 @@
<version>3.0.0-rc4</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ </dependency>
</dependencies>
<profiles>
<profile>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index e17eb43..40883ce 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -18,9 +18,13 @@
*/
package org.apache.iotdb.cluster;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -123,10 +127,9 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
/** hardLinkCleaner will periodically clean expired hardlinks created during snapshots */
private ScheduledExecutorService hardLinkCleanerThread;
- // currently, dataClientProvider is only used for those instances who do not belong to any
+ // currently, clientManager is only used for those instances who do not belong to any
// DataGroup..
- // TODO: however, why not let all dataGroupMembers getting clients from dataClientProvider
- private DataClientProvider dataClientProvider;
+ private IClientManager clientManager;
private ClusterIoTDB() {
// we do not init anything here, so that we can re-initialize the instance in IT.
@@ -155,7 +158,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
MetaPuller.getInstance().init(metaGroupEngine);
dataGroupEngine = new DataGroupServiceImpls(protocolFactory, metaGroupEngine);
- dataClientProvider = new DataClientProvider(protocolFactory);
+ clientManager =
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient);
initTasks();
try {
// we need to check config after initLocalEngines.
@@ -532,11 +538,6 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
});
}
- // @TestOnly
- // public void setMetaClusterServer(MetaGroupMember RaftTSMetaServiceImpl) {
- // metaServer = RaftTSMetaServiceImpl;
- // }
-
public void stop() {
deactivate();
}
@@ -568,13 +569,9 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
}
}
- public DataClientProvider getClientProvider() {
- return dataClientProvider;
- }
-
@TestOnly
- public void setClientProvider(DataClientProvider dataClientProvider) {
- this.dataClientProvider = dataClientProvider;
+ public void setClientManager(IClientManager clientManager) {
+ this.clientManager = clientManager;
}
public MetaGroupMember getMetaGroupEngine() {
@@ -635,6 +632,25 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
printClientConnectionErrorStack = false;
}
+ public SyncDataClient getSyncDataClient(Node node, int readOperationTimeoutMS) throws Exception {
+ SyncDataClient dataClient =
+ (SyncDataClient) clientManager.borrowSyncClient(node, ClientCategory.DATA);
+ if (dataClient != null) {
+ dataClient.setTimeout(readOperationTimeoutMS);
+ }
+ return dataClient;
+ }
+
+ public AsyncDataClient getAsyncDataClient(Node node, int readOperationTimeoutMS)
+ throws Exception {
+ AsyncDataClient dataClient =
+ (AsyncDataClient) clientManager.borrowAsyncClient(node, ClientCategory.DATA);
+ if (dataClient != null) {
+ dataClient.setTimeout(readOperationTimeoutMS);
+ }
+ return dataClient;
+ }
+
private static class ClusterIoTDBHolder {
private static final ClusterIoTDB INSTANCE = new ClusterIoTDB();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/BaseFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/BaseFactory.java
new file mode 100644
index 0000000..cab2873
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/BaseFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.commons.pool2.KeyedPooledObjectFactory;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class BaseFactory<K, T> implements KeyedPooledObjectFactory<K, T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseFactory.class);
+
+ protected TAsyncClientManager[] managers;
+ protected TProtocolFactory protocolFactory;
+ protected AtomicInteger clientCnt = new AtomicInteger();
+ protected ClientCategory category;
+ protected IClientManager clientPoolManager;
+
+ public BaseFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ this.protocolFactory = protocolFactory;
+ this.category = category;
+ }
+
+ public BaseFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ this.protocolFactory = protocolFactory;
+ this.category = category;
+ this.clientPoolManager = clientManager;
+ }
+
+ @Override
+ public void activateObject(K node, PooledObject<T> pooledObject) throws Exception {}
+
+ @Override
+ public void passivateObject(K node, PooledObject<T> pooledObject) throws Exception {}
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
similarity index 53%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
index fa0da62..8b9842f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientCategory.java
@@ -17,28 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.sync;
+package org.apache.iotdb.cluster.client;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+public enum ClientCategory {
+ META("MetaClient"),
+ META_HEARTBEAT("MetaHeartbeatClient"),
+ DATA("DataClient"),
+ DATA_HEARTBEAT("DataHeartbeatClient"),
+ DATA_ASYNC_APPEND_CLIENT("DataAsyncAppendClient");
-import org.apache.thrift.transport.TTransportException;
+ private String name;
-import java.io.IOException;
-
-public interface SyncClientFactory {
-
- /**
- * Get a client which will connect the given node and be cached in the given pool.
- *
- * @param node the cluster node the client will connect.
- * @param pool the pool that will cache the client for reusing.
- * @return
- * @throws IOException
- */
- RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws TTransportException;
+ ClientCategory(String name) {
+ this.name = name;
+ }
- default String nodeInfo(Node node) {
- return node.toString();
+ public String getName() {
+ return name;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
new file mode 100644
index 0000000..735cf9f
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientManager.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * One should borrow the reusable client from this manager and return the client after use. The
+ * underlying client pool is powered by Apache Commons Pool. The class provided 3 default pool group
+ * according to current usage: RequestForwardClient, DataGroupClients, MetaGroupClients.
+ *
+ * <p>TODO: We can refine the client structure by reorg the interfaces defined in cluster-thrift.
+ */
+public class ClientManager implements IClientManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientManager.class);
+
+ private Map<ClientCategory, KeyedObjectPool<Node, RaftService.AsyncClient>> asyncClientPoolMap;
+ private Map<ClientCategory, KeyedObjectPool<Node, RaftService.Client>> syncClientPoolMap;
+ private ClientPoolFactory clientPoolFactory;
+
+ /**
+ * {@link ClientManager.Type#RequestForwardClient} represents the clients used to forward external
+ * client requests to proper node to handle such as query, insert request.
+ *
+ * <p>{@link ClientManager.Type#DataGroupClient} represents the clients used to appendEntry,
+ * appendEntries, sendHeartbeat, etc for data raft group.
+ *
+ * <p>{@link ClientManager.Type#MetaGroupClient} represents the clients used to appendEntry,
+ * appendEntries, sendHeartbeat, etc for meta raft group. *
+ */
+ public enum Type {
+ RequestForwardClient,
+ DataGroupClient,
+ MetaGroupClient
+ }
+
+ public ClientManager(boolean isAsyncMode, Type type) {
+ clientPoolFactory = new ClientPoolFactory();
+ clientPoolFactory.setClientManager(this);
+ if (isAsyncMode) {
+ asyncClientPoolMap = Maps.newHashMap();
+ constructAsyncClientMap(type);
+ } else {
+ syncClientPoolMap = Maps.newHashMap();
+ constructSyncClientMap(type);
+ }
+ }
+
+ private void constructAsyncClientMap(Type type) {
+ switch (type) {
+ /**
+ * request from external clients are forward via data group port, so it's type is {@link
+ * ClientCategory.DATA} *
+ */
+ case RequestForwardClient:
+ asyncClientPoolMap.put(
+ ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+ break;
+ case MetaGroupClient:
+ asyncClientPoolMap.put(
+ ClientCategory.META, clientPoolFactory.createAsyncMetaPool(ClientCategory.META));
+ asyncClientPoolMap.put(
+ ClientCategory.META_HEARTBEAT,
+ clientPoolFactory.createAsyncMetaPool(ClientCategory.META_HEARTBEAT));
+ break;
+ case DataGroupClient:
+ asyncClientPoolMap.put(
+ ClientCategory.DATA, clientPoolFactory.createAsyncDataPool(ClientCategory.DATA));
+ asyncClientPoolMap.put(
+ ClientCategory.DATA_HEARTBEAT,
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA_HEARTBEAT));
+ asyncClientPoolMap.put(
+ ClientCategory.DATA_ASYNC_APPEND_CLIENT,
+ clientPoolFactory.createSingleManagerAsyncDataPool());
+ break;
+ default:
+ logger.warn("unsupported ClientManager type: {}", type);
+ break;
+ }
+ }
+
+ private void constructSyncClientMap(Type type) {
+ switch (type) {
+ /**
+ * request from external clients are forward via data group port, so it's type is {@link
+ * ClientCategory.DATA} *
+ */
+ case RequestForwardClient:
+ syncClientPoolMap.put(
+ ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+ break;
+ case MetaGroupClient:
+ syncClientPoolMap.put(
+ ClientCategory.META, clientPoolFactory.createSyncMetaPool(ClientCategory.META));
+ syncClientPoolMap.put(
+ ClientCategory.META_HEARTBEAT,
+ clientPoolFactory.createSyncMetaPool(ClientCategory.META_HEARTBEAT));
+ break;
+ case DataGroupClient:
+ syncClientPoolMap.put(
+ ClientCategory.DATA, clientPoolFactory.createSyncDataPool(ClientCategory.DATA));
+ syncClientPoolMap.put(
+ ClientCategory.DATA_HEARTBEAT,
+ clientPoolFactory.createSyncDataPool(ClientCategory.DATA_HEARTBEAT));
+ break;
+ default:
+ logger.warn("unsupported ClientManager type: {}", type);
+ break;
+ }
+ }
+
+ /**
+ * It's safe to convert: 1. RaftService.AsyncClient to TSDataService.AsyncClient when category is
+ * DATA or DATA_HEARTBEAT; 2. RaftService.AsyncClient to TSMetaService.AsyncClient when category
+ * is META or META_HEARTBEAT.
+ *
+ * @param category
+ * @return RaftService.AsyncClient
+ */
+ @Override
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws Exception {
+ return asyncClientPoolMap.get(category).borrowObject(node);
+ }
+
+ /**
+ * It's safe to convert: 1. RaftService.Client to TSDataService.Client when category is DATA or
+ * DATA_HEARTBEAT; 2. RaftService.Client to TSMetaService.Client when category is META or
+ * META_HEARTBEAT.
+ *
+ * @param category
+ * @return RaftService.Client
+ */
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) throws Exception {
+ return syncClientPoolMap.get(category).borrowObject(node);
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {
+ if (client != null && node != null) {
+ try {
+ asyncClientPoolMap.get(category).returnObject(node, client);
+ } catch (Exception e) {
+ logger.error("AsyncClient return error: {}", client, e);
+ }
+ }
+ }
+
+ @Override
+ public void returnSyncClient(RaftService.Client client, Node node, ClientCategory category) {
+ if (client != null && node != null) {
+ try {
+ syncClientPoolMap.get(category).returnObject(node, client);
+ } catch (Exception e) {
+ logger.error("SyncClient return error: {}", client, e);
+ }
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
new file mode 100644
index 0000000..0e837e8
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/ClientPoolFactory.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+
+import java.time.Duration;
+
+public class ClientPoolFactory {
+
+ protected long waitClientTimeoutMS;
+ protected int maxConnectionForEachNode;
+ private TProtocolFactory protocolFactory;
+ private GenericKeyedObjectPoolConfig poolConfig;
+ private IClientManager clientManager;
+
+ public ClientPoolFactory() {
+ ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
+ this.waitClientTimeoutMS = config.getWaitClientTimeoutMS();
+ this.maxConnectionForEachNode = config.getMaxClientPerNodePerMember();
+ protocolFactory =
+ config.isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
+ poolConfig = new GenericKeyedObjectPoolConfig();
+ poolConfig.setMaxTotalPerKey(maxConnectionForEachNode);
+ poolConfig.setMaxWait(Duration.ofMillis(waitClientTimeoutMS));
+ poolConfig.setTestOnReturn(true);
+ poolConfig.setTestOnBorrow(true);
+ }
+
+ public void setClientManager(IClientManager clientManager) {
+ this.clientManager = clientManager;
+ }
+
+ public GenericKeyedObjectPool<Node, RaftService.Client> createSyncDataPool(
+ ClientCategory category) {
+ return new GenericKeyedObjectPool<>(
+ new SyncDataClient.SyncDataClientFactory(protocolFactory, category, clientManager),
+ poolConfig);
+ }
+
+ public GenericKeyedObjectPool<Node, RaftService.Client> createSyncMetaPool(
+ ClientCategory category) {
+ return new GenericKeyedObjectPool<>(
+ new SyncMetaClient.SyncMetaClientFactory(protocolFactory, category, clientManager),
+ poolConfig);
+ }
+
+ public GenericKeyedObjectPool<Node, RaftService.AsyncClient> createAsyncDataPool(
+ ClientCategory category) {
+ return new GenericKeyedObjectPool<>(
+ new AsyncDataClient.AsyncDataClientFactory(protocolFactory, category, clientManager),
+ poolConfig);
+ }
+
+ public GenericKeyedObjectPool<Node, RaftService.AsyncClient> createAsyncMetaPool(
+ ClientCategory category) {
+ return new GenericKeyedObjectPool<>(
+ new AsyncMetaClient.AsyncMetaClientFactory(protocolFactory, category, clientManager),
+ poolConfig);
+ }
+
+ public GenericKeyedObjectPool<Node, RaftService.AsyncClient> createSingleManagerAsyncDataPool() {
+ return new GenericKeyedObjectPool<>(
+ new AsyncDataClient.SingleManagerFactory(protocolFactory, clientManager), poolConfig);
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
deleted file mode 100644
index 4f189f1..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client;
-
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.Factory;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.db.utils.TestOnly;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-
-public class DataClientProvider {
-
- /**
- * dataClientPool provides reusable thrift clients to connect to the DataGroupMembers of other
- * nodes
- */
- private AsyncClientPool dataAsyncClientPool;
-
- private SyncClientPool dataSyncClientPool;
-
- public DataClientProvider(TProtocolFactory factory) {
- if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- dataSyncClientPool = new SyncClientPool(new SyncDataClient.Factory(factory));
- } else {
- dataAsyncClientPool = new AsyncClientPool(new Factory(factory));
- }
- }
-
- @TestOnly
- AsyncClientPool getDataAsyncClientPool() {
- return dataAsyncClientPool;
- }
-
- @TestOnly
- SyncClientPool getDataSyncClientPool() {
- return dataSyncClientPool;
- }
-
- /**
- * Get a thrift client that will connect to "node" using the data port.
- *
- * @param node the node to be connected
- * @param timeout timeout threshold of connection
- */
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- AsyncDataClient client = (AsyncDataClient) dataAsyncClientPool.getClient(node);
- if (client == null) {
- throw new IOException("can not get client for node=" + node);
- }
- client.setTimeout(timeout);
- return client;
- }
-
- /**
- * IMPORTANT!!! After calling this function, the caller should make sure to call {@link
- * org.apache.iotdb.cluster.utils.ClientUtils#putBackSyncClient(Client)} to put the client back
- * into the client pool, otherwise there is a risk of client leakage.
- *
- * <p>Get a thrift client that will connect to "node" using the data port.
- *
- * @param node the node to be connected
- * @param timeout timeout threshold of connection
- */
- public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
- SyncDataClient client = (SyncDataClient) dataSyncClientPool.getClient(node);
- if (client == null) {
- throw new TException("can not get client for node=" + node);
- }
- client.setTimeout(timeout);
- return client;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/IClientManager.java
similarity index 60%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
copy to cluster/src/main/java/org/apache/iotdb/cluster/client/IClientManager.java
index fa0da62..5e2add2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/IClientManager.java
@@ -17,28 +17,17 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.sync;
+package org.apache.iotdb.cluster.client;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.thrift.transport.TTransportException;
+public interface IClientManager {
+ RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category) throws Exception;
-import java.io.IOException;
+ RaftService.Client borrowSyncClient(Node node, ClientCategory category) throws Exception;
-public interface SyncClientFactory {
+ void returnAsyncClient(RaftService.AsyncClient client, Node node, ClientCategory category);
- /**
- * Get a client which will connect the given node and be cached in the given pool.
- *
- * @param node the cluster node the client will connect.
- * @param pool the pool that will cache the client for reusing.
- * @return
- * @throws IOException
- */
- RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws TTransportException;
-
- default String nodeInfo(Node node) {
- return node.toString();
- }
+ void returnSyncClient(RaftService.Client client, Node node, ClientCategory category);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncBaseFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncBaseFactory.java
new file mode 100644
index 0000000..220ff7c
--- /dev/null
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncBaseFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client.async;
+
+import org.apache.iotdb.cluster.client.BaseFactory;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.apache.thrift.async.TAsyncClientManager;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class AsyncBaseFactory<K, T extends RaftService.AsyncClient>
+ extends BaseFactory<K, T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(AsyncBaseFactory.class);
+
+ public AsyncBaseFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ super(protocolFactory, category);
+ managers =
+ new TAsyncClientManager
+ [ClusterDescriptor.getInstance().getConfig().getSelectorNumOfClientPool()];
+ for (int i = 0; i < managers.length; i++) {
+ try {
+ managers[i] = new TAsyncClientManager();
+ } catch (IOException e) {
+ logger.error("Cannot create data heartbeat client manager for factory", e);
+ }
+ }
+ }
+
+ public AsyncBaseFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ super(protocolFactory, category, clientManager);
+ managers =
+ new TAsyncClientManager
+ [ClusterDescriptor.getInstance().getConfig().getSelectorNumOfClientPool()];
+ for (int i = 0; i < managers.length; i++) {
+ try {
+ managers[i] = new TAsyncClientManager();
+ } catch (IOException e) {
+ logger.error("Cannot create data heartbeat client manager for factory", e);
+ }
+ }
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientFactory.java
deleted file mode 100644
index a96364d..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientFactory.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-
-import org.apache.thrift.async.TAsyncClientManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public abstract class AsyncClientFactory {
-
- private static final Logger logger = LoggerFactory.getLogger(AsyncClientFactory.class);
- static TAsyncClientManager[] managers;
- org.apache.thrift.protocol.TProtocolFactory protocolFactory;
- AtomicInteger clientCnt = new AtomicInteger();
-
- static {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- managers =
- new TAsyncClientManager
- [ClusterDescriptor.getInstance().getConfig().getSelectorNumOfClientPool()];
- for (int i = 0; i < managers.length; i++) {
- try {
- managers[i] = new TAsyncClientManager();
- } catch (IOException e) {
- logger.error("Cannot create data heartbeat client manager for factory", e);
- }
- }
- }
- }
-
- /**
- * Get a client which will connect the given node and be cached in the given pool.
- *
- * @param node the cluster node the client will connect.
- * @param pool the pool that will cache the client for reusing.
- * @return
- * @throws IOException
- */
- protected abstract RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException;
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
deleted file mode 100644
index 2aa8d04..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncClientPool.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.utils.ClusterNode;
-import org.apache.iotdb.db.utils.TestOnly;
-
-import org.apache.thrift.async.TAsyncMethodCall;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class AsyncClientPool {
-
- private static final Logger logger = LoggerFactory.getLogger(AsyncClientPool.class);
- private long waitClientTimeutMS;
- private int maxConnectionForEachNode;
- private Map<ClusterNode, Deque<AsyncClient>> clientCaches = new ConcurrentHashMap<>();
- private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
- private AsyncClientFactory asyncClientFactory;
-
- // TODO fix me: better to throw exception if the client can not be get. Then we can remove this
- // field.
- public static boolean printStack;
-
- public AsyncClientPool(AsyncClientFactory asyncClientFactory) {
- this.asyncClientFactory = asyncClientFactory;
- this.waitClientTimeutMS = ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS();
- this.maxConnectionForEachNode =
- ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- }
-
- /**
- * See getClient(Node node, boolean activatedOnly)
- *
- * @param node
- * @return
- * @throws IOException
- */
- public AsyncClient getClient(Node node) throws IOException {
- return getClient(node, true);
- }
-
- /**
- * Get a client of the given node from the cache if one is available, or create a new one.
- *
- * <p>IMPORTANT!!! The caller should check whether the return value is null or not!
- *
- * @param node the node want to connect
- * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
- * true, which avoid unnecessary wait for already down nodes, but heartbeat attempts should
- * always try to connect so the node can be reactivated ASAP
- * @return if the node can connect, return the client, otherwise null
- * @throws IOException if the node can not be connected
- */
- public AsyncClient getClient(Node node, boolean activatedOnly) throws IOException {
- ClusterNode clusterNode = new ClusterNode(node);
- if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
- return null;
- }
-
- AsyncClient client;
- // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (clientStack) {
- if (clientStack.isEmpty()) {
- int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
- if (nodeClientNum >= maxConnectionForEachNode) {
- client = waitForClient(clientStack, clusterNode);
- } else {
- client = asyncClientFactory.getAsyncClient(clusterNode, this);
- nodeClientNumMap.compute(
- clusterNode,
- (n, oldValue) -> {
- if (oldValue == null) return 1;
- return oldValue + 1;
- });
- }
- } else {
- client = clientStack.pop();
- }
- }
- return client;
- }
-
- /**
- * Wait for a client to be returned for at most WAIT_CLIENT_TIMEOUT_MS milliseconds. If no client
- * is returned beyond the timeout, a new client will be returned. WARNING: the caller must
- * synchronize on the pool.
- *
- * @param clientStack
- * @param clusterNode
- * @return
- * @throws IOException
- */
- @SuppressWarnings({"squid:S2273"}) // synchronized outside
- private AsyncClient waitForClient(Deque<AsyncClient> clientStack, ClusterNode clusterNode)
- throws IOException {
- // wait for an available client
- long waitStart = System.currentTimeMillis();
- while (clientStack.isEmpty()) {
- try {
- clientStack.wait(waitClientTimeutMS);
- if (clientStack.isEmpty() && System.currentTimeMillis() - waitStart >= waitClientTimeutMS) {
- logger.warn(
- "{} Cannot get an available client after {}ms, create a new one.",
- asyncClientFactory,
- waitClientTimeutMS);
- AsyncClient asyncClient = asyncClientFactory.getAsyncClient(clusterNode, this);
- nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
- return asyncClient;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Interrupted when waiting for an available client of {}", clusterNode);
- return null;
- }
- }
- return clientStack.pop();
- }
-
- /**
- * Return a client of a node to the pool. Closed client should not be returned.
- *
- * @param node
- * @param client
- */
- public void putClient(Node node, AsyncClient client) {
- ClusterNode clusterNode = new ClusterNode(node);
- TAsyncMethodCall<?> call = null;
- if (client instanceof AsyncDataClient) {
- call = ((AsyncDataClient) client).getCurrMethod();
- } else if (client instanceof AsyncMetaClient) {
- call = ((AsyncMetaClient) client).getCurrMethod();
- }
- if (call != null) {
- logger.warn("A using client {} is put back while running {}", client.hashCode(), call);
- }
- // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (clientStack) {
- clientStack.push(client);
- clientStack.notifyAll();
- }
- }
-
- void onError(Node node) {
- ClusterNode clusterNode = new ClusterNode(node);
- // clean all cached clients when network fails
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (clientStack) {
- while (!clientStack.isEmpty()) {
- AsyncClient client = clientStack.pop();
- if (client instanceof AsyncDataClient) {
- ((AsyncDataClient) client).close();
- } else if (client instanceof AsyncMetaClient) {
- ((AsyncMetaClient) client).close();
- }
- }
- nodeClientNumMap.put(clusterNode, 0);
- clientStack.notifyAll();
- NodeStatusManager.getINSTANCE().deactivate(node);
- }
- }
-
- @SuppressWarnings("squid:S1135")
- void onComplete(Node node) {
- NodeStatusManager.getINSTANCE().activate(node);
- }
-
- void recreateClient(Node node) {
- ClusterNode clusterNode = new ClusterNode(node);
- Deque<AsyncClient> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (clientStack) {
- try {
- AsyncClient asyncClient = asyncClientFactory.getAsyncClient(node, this);
- clientStack.push(asyncClient);
- } catch (IOException e) {
- logger.error("Cannot create a new client for {}", node, e);
- nodeClientNumMap.computeIfPresent(clusterNode, (n, cnt) -> cnt - 1);
- }
- clientStack.notifyAll();
- }
- }
-
- @TestOnly
- public Map<ClusterNode, Integer> getNodeClientNumMap() {
- return nodeClientNumMap;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index c4ce231..ed10619 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -19,13 +19,18 @@
package org.apache.iotdb.cluster.client.async;
+import org.apache.iotdb.cluster.client.BaseFactory;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncClient;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -34,21 +39,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Date;
/**
* Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
*/
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class AsyncDataClient extends AsyncClient {
+public class AsyncDataClient extends TSDataService.AsyncClient {
private static final Logger logger = LoggerFactory.getLogger(AsyncDataClient.class);
- Node node;
- AsyncClientPool pool;
+ private Node node;
+ private ClientCategory category;
+ private IClientManager clientManager;
+ @TestOnly
public AsyncDataClient(
TProtocolFactory protocolFactory,
TAsyncClientManager clientManager,
@@ -58,104 +62,161 @@ public class AsyncDataClient extends AsyncClient {
public AsyncDataClient(
TProtocolFactory protocolFactory,
- TAsyncClientManager clientManager,
+ TAsyncClientManager tClientManager,
Node node,
- AsyncClientPool pool)
+ ClientCategory category)
throws IOException {
// the difference of the two clients lies in the port
super(
protocolFactory,
- clientManager,
+ tClientManager,
TNonblockingSocketWrapper.wrap(
- node.getInternalIp(), node.getDataPort(), ClusterConstant.getConnectionTimeoutInMS()));
+ node.getInternalIp(),
+ ClientUtils.getPort(node, category),
+ ClusterConstant.getConnectionTimeoutInMS()));
this.node = node;
- this.pool = pool;
+ this.category = category;
+ }
+
+ public AsyncDataClient(
+ TProtocolFactory protocolFactory,
+ TAsyncClientManager tClientManager,
+ Node node,
+ ClientCategory category,
+ IClientManager manager)
+ throws IOException {
+ this(protocolFactory, tClientManager, node, category);
+ this.clientManager = manager;
+ }
+
+ public void close() {
+ ___transport.close();
+ ___currentMethod = null;
+ }
+
+ public boolean isValid() {
+ return ___transport != null;
+ }
+
+ /**
+ * return self if clientPool is not null, the method doesn't need to call by user, it will trigger
+ * once client transport complete
+ */
+ private void returnSelf() {
+ logger.debug("return client: ", toString());
+ if (clientManager != null) clientManager.returnAsyncClient(this, node, category);
}
@Override
public void onComplete() {
super.onComplete();
- // return itself to the pool if the job is done
- if (pool != null) {
- pool.putClient(node, this);
- pool.onComplete(node);
- }
+ returnSelf();
+ // TODO: active node status
}
- @SuppressWarnings("squid:S1135")
@Override
- public void onError(Exception e) {
- super.onError(e);
- if (pool != null) {
- pool.recreateClient(node);
- // TODO: if e instance of network failure
- pool.onError(node);
+ public String toString() {
+ return "Async"
+ + category.getName()
+ + "{"
+ + "node="
+ + node
+ + ","
+ + "port="
+ + ClientUtils.getPort(node, category)
+ + '}';
+ }
+
+ public Node getNode() {
+ return node;
+ }
+
+ public boolean isReady() {
+ try {
+ checkReady();
+ return true;
+ } catch (Exception e) {
+ return false;
}
}
- public void close() {
- ___transport.close();
- ___currentMethod = null;
+ @TestOnly
+ TAsyncMethodCall<Object> getCurrMethod() {
+ return ___currentMethod;
}
- public static class Factory extends AsyncClientFactory {
+ public static class AsyncDataClientFactory extends AsyncBaseFactory<Node, AsyncDataClient> {
- public Factory(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
+ public AsyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ super(protocolFactory, category);
+ }
+
+ public AsyncDataClientFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ super(protocolFactory, category, clientManager);
+ }
+
+ @Override
+ public void destroyObject(Node node, PooledObject<AsyncDataClient> pooledObject)
+ throws Exception {
+ pooledObject.getObject().close();
}
@Override
- public RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException {
+ public PooledObject<AsyncDataClient> makeObject(Node node) throws Exception {
TAsyncClientManager manager = managers[clientCnt.incrementAndGet() % managers.length];
manager = manager == null ? new TAsyncClientManager() : manager;
- return new AsyncDataClient(protocolFactory, manager, node, pool);
+ return new DefaultPooledObject<>(
+ new AsyncDataClient(protocolFactory, manager, node, category, clientPoolManager));
}
- }
- public static class SingleManagerFactory extends AsyncClientFactory {
+ @Override
+ public boolean validateObject(Node node, PooledObject<AsyncDataClient> pooledObject) {
+ return pooledObject.getObject() != null && pooledObject.getObject().isValid();
+ }
+ }
- private TAsyncClientManager manager;
+ public static class SingleManagerFactory extends BaseFactory<Node, AsyncDataClient> {
- public SingleManagerFactory(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- try {
- manager = new TAsyncClientManager();
- } catch (IOException e) {
- logger.error("Cannot init manager of SingleThreadFactoryAsync", e);
- }
+ public SingleManagerFactory(TProtocolFactory protocolFactory) {
+ super(protocolFactory, ClientCategory.DATA);
+ managers = new TAsyncClientManager[1];
+ try {
+ managers[0] = new TAsyncClientManager();
+ } catch (IOException e) {
+ logger.error("Cannot create data heartbeat client manager for factory", e);
}
}
- @Override
- public RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException {
- return new AsyncDataClient(protocolFactory, manager, node, pool);
+ public SingleManagerFactory(TProtocolFactory protocolFactory, IClientManager clientManager) {
+ this(protocolFactory);
+ this.clientPoolManager = clientManager;
}
- }
- @Override
- public String toString() {
- return "DataClient{" + "node=" + node + '}';
- }
+ @Override
+ public void activateObject(Node node, PooledObject<AsyncDataClient> pooledObject)
+ throws Exception {}
- public Node getNode() {
- return node;
- }
+ @Override
+ public void destroyObject(Node node, PooledObject<AsyncDataClient> pooledObject)
+ throws Exception {
+ pooledObject.getObject().close();
+ }
- public boolean isReady() {
- if (___currentMethod != null) {
- logger.warn(
- "Client {} is running {} and will timeout at {}",
- hashCode(),
- ___currentMethod,
- new Date(___currentMethod.getTimeoutTimestamp()));
+ @Override
+ public PooledObject<AsyncDataClient> makeObject(Node node) throws Exception {
+ return new DefaultPooledObject<>(
+ new AsyncDataClient(
+ protocolFactory, managers[0], node, ClientCategory.DATA, clientPoolManager));
}
- return ___currentMethod == null && !hasError();
- }
- TAsyncMethodCall<Object> getCurrMethod() {
- return ___currentMethod;
+ @Override
+ public void passivateObject(Node node, PooledObject<AsyncDataClient> pooledObject)
+ throws Exception {}
+
+ @Override
+ public boolean validateObject(Node node, PooledObject<AsyncDataClient> pooledObject) {
+ return pooledObject.getObject() != null && pooledObject.getObject().isValid();
+ }
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
deleted file mode 100644
index 0fbf41e..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-
-/**
- * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-public class AsyncDataHeartbeatClient extends AsyncDataClient {
-
- private AsyncDataHeartbeatClient(
- TProtocolFactory protocolFactory,
- TAsyncClientManager clientManager,
- Node node,
- AsyncClientPool pool)
- throws IOException {
- super(
- protocolFactory,
- clientManager,
- TNonblockingSocketWrapper.wrap(
- node.getInternalIp(),
- node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS()));
- this.node = node;
- this.pool = pool;
- }
-
- public static class Factory extends AsyncClientFactory {
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException {
- TAsyncClientManager manager = managers[clientCnt.incrementAndGet() % managers.length];
- manager = manager == null ? new TAsyncClientManager() : manager;
- return new AsyncDataHeartbeatClient(protocolFactory, manager, node, pool);
- }
- }
-
- @Override
- public String toString() {
- return "AsyncDataHeartbeatClient{"
- + "node="
- + super.getNode()
- + ","
- + "dataHeartbeatPort="
- + (super.getNode().getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET)
- + '}';
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index f6fb94d..8e85414 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -19,33 +19,33 @@
package org.apache.iotdb.cluster.client.async;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TNonblockingTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Date;
/**
* Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
*/
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class AsyncMetaClient extends AsyncClient {
+public class AsyncMetaClient extends TSMetaService.AsyncClient {
- private static final Logger logger = LoggerFactory.getLogger(AsyncMetaClient.class);
- Node node;
- AsyncClientPool pool;
+ private Node node;
+ private ClientCategory category;
+ private IClientManager clientManager;
public AsyncMetaClient(
TProtocolFactory protocolFactory,
@@ -58,55 +58,57 @@ public class AsyncMetaClient extends AsyncClient {
TProtocolFactory protocolFactory,
TAsyncClientManager clientManager,
Node node,
- AsyncClientPool pool)
+ ClientCategory category)
throws IOException {
// the difference of the two clients lies in the port
super(
protocolFactory,
clientManager,
TNonblockingSocketWrapper.wrap(
- node.getInternalIp(), node.getMetaPort(), ClusterConstant.getConnectionTimeoutInMS()));
+ node.getInternalIp(),
+ ClientUtils.getPort(node, category),
+ ClusterConstant.getConnectionTimeoutInMS()));
this.node = node;
- this.pool = pool;
+ this.category = category;
}
- @Override
- public void onComplete() {
- super.onComplete();
- // return itself to the pool if the job is done
- if (pool != null) {
- pool.putClient(node, this);
- pool.onComplete(node);
- }
+ public AsyncMetaClient(
+ TProtocolFactory protocolFactory,
+ TAsyncClientManager clientManager,
+ Node node,
+ ClientCategory category,
+ IClientManager manager)
+ throws IOException {
+ this(protocolFactory, clientManager, node, category);
+ this.clientManager = manager;
}
- @SuppressWarnings("squid:S1135")
- @Override
- public void onError(Exception e) {
- super.onError(e);
- pool.recreateClient(node);
- // TODO: if e instance of network failure
- pool.onError(node);
+ /**
+ * return self if clientManager is not null, the method doesn't need to call by user, it will
+ * trigger once client transport complete
+ */
+ public void returnSelf() {
+ if (clientManager != null) clientManager.returnAsyncClient(this, node, category);
}
- public static class Factory extends AsyncClientFactory {
-
- public Factory(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException {
- TAsyncClientManager manager = managers[clientCnt.incrementAndGet() % managers.length];
- manager = manager == null ? new TAsyncClientManager() : manager;
- return new AsyncMetaClient(protocolFactory, manager, node, pool);
- }
+ @Override
+ public void onComplete() {
+ super.onComplete();
+ returnSelf();
+ // TODO: active node status
}
@Override
public String toString() {
- return "MetaClient{" + "node=" + node + '}';
+ return "Async"
+ + category.getName()
+ + "{"
+ + "node="
+ + node
+ + ","
+ + "port="
+ + ClientUtils.getPort(node, category)
+ + '}';
}
public void close() {
@@ -118,18 +120,62 @@ public class AsyncMetaClient extends AsyncClient {
return node;
}
+ @TestOnly
public boolean isReady() {
- if (___currentMethod != null) {
- logger.warn(
- "Client {} is running {} and will timeout at {}",
- hashCode(),
- ___currentMethod,
- new Date(___currentMethod.getTimeoutTimestamp()));
+ try {
+ checkReady();
+ return true;
+ } catch (Exception e) {
+ return false;
}
- return ___currentMethod == null;
}
+ public boolean isValid() {
+ return ___transport != null;
+ }
+
+ @TestOnly
TAsyncMethodCall<Object> getCurrMethod() {
return ___currentMethod;
}
+
+ public static class AsyncMetaClientFactory extends AsyncBaseFactory<Node, AsyncMetaClient> {
+
+ public AsyncMetaClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ super(protocolFactory, category);
+ }
+
+ public AsyncMetaClientFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ super(protocolFactory, category, clientManager);
+ }
+
+ @Override
+ public void activateObject(Node node, PooledObject<AsyncMetaClient> pooledObject) {
+ pooledObject.getObject().setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ }
+
+ @Override
+ public void destroyObject(Node node, PooledObject<AsyncMetaClient> pooledObject)
+ throws Exception {
+ pooledObject.getObject().close();
+ }
+
+ @Override
+ public PooledObject<AsyncMetaClient> makeObject(Node node) throws Exception {
+ TAsyncClientManager manager = managers[clientCnt.incrementAndGet() % managers.length];
+ manager = manager == null ? new TAsyncClientManager() : manager;
+ return new DefaultPooledObject<>(
+ new AsyncMetaClient(protocolFactory, manager, node, category, clientPoolManager));
+ }
+
+ @Override
+ public void passivateObject(Node node, PooledObject<AsyncMetaClient> pooledObject)
+ throws Exception {}
+
+ @Override
+ public boolean validateObject(Node node, PooledObject<AsyncMetaClient> pooledObject) {
+ return pooledObject != null && pooledObject.getObject().isValid();
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
deleted file mode 100644
index cf0d3a2..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-
-/**
- * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-public class AsyncMetaHeartbeatClient extends AsyncMetaClient {
-
- private AsyncMetaHeartbeatClient(
- TProtocolFactory protocolFactory,
- TAsyncClientManager clientManager,
- Node node,
- AsyncClientPool pool)
- throws IOException {
- super(
- protocolFactory,
- clientManager,
- TNonblockingSocketWrapper.wrap(
- node.getInternalIp(),
- node.getMetaPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS()));
- this.node = node;
- this.pool = pool;
- }
-
- public static class Factory extends AsyncClientFactory {
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public RaftService.AsyncClient getAsyncClient(Node node, AsyncClientPool pool)
- throws IOException {
- TAsyncClientManager manager = managers[clientCnt.incrementAndGet() % managers.length];
- manager = manager == null ? new TAsyncClientManager() : manager;
- return new AsyncMetaHeartbeatClient(protocolFactory, manager, node, pool);
- }
- }
-
- @Override
- public String toString() {
- return "AsyncMetaHeartbeatClient{"
- + "node="
- + super.getNode()
- + ","
- + "metaHeartbeatPort="
- + (super.getNode().getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET)
- + '}';
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
deleted file mode 100644
index 661161e..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientPool.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
-import org.apache.iotdb.cluster.utils.ClusterNode;
-import org.apache.iotdb.db.utils.TestOnly;
-
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.Deque;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class SyncClientPool {
-
- private static final Logger logger = LoggerFactory.getLogger(SyncClientPool.class);
- private long waitClientTimeoutMS;
- private int maxConnectionForEachNode;
- // TODO should we really need a Node here? or just using its ID?
- private Map<ClusterNode, Deque<RaftService.Client>> clientCaches = new ConcurrentHashMap<>();
- private Map<ClusterNode, Integer> nodeClientNumMap = new ConcurrentHashMap<>();
- private SyncClientFactory syncClientFactory;
-
- public SyncClientPool(SyncClientFactory syncClientFactory) {
- this.syncClientFactory = syncClientFactory;
- this.waitClientTimeoutMS = ClusterDescriptor.getInstance().getConfig().getWaitClientTimeoutMS();
- this.maxConnectionForEachNode =
- ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- }
-
- /**
- * See getClient(Node node, boolean activatedOnly)
- *
- * @param node the node want to connect
- * @return if the node can connect, return the client, otherwise null
- */
- public RaftService.Client getClient(Node node) {
- return getClient(node, true);
- }
-
- /**
- * Get a client of the given node from the cache if one is available, or create a new one.
- *
- * <p>IMPORTANT!!! The caller should check whether the return value is null or not!
- *
- * @param node the node want to connect
- * @param activatedOnly if true, only return a client if the node's NodeStatus.isActivated ==
- * true, which avoid unnecessary wait for already down nodes, but heartbeat attempts should
- * always try to connect so the node can be reactivated ASAP
- * @return if the node can connect, return the client, otherwise null
- */
- public RaftService.Client getClient(Node node, boolean activatedOnly) {
- if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
- return null;
- }
-
- ClusterNode clusterNode = new ClusterNode(node);
- // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<RaftService.Client> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
-
- synchronized (clientStack) {
- if (clientStack.isEmpty()) {
- int nodeClientNum = nodeClientNumMap.getOrDefault(clusterNode, 0);
- if (nodeClientNum >= maxConnectionForEachNode) {
- return waitForClient(clientStack, clusterNode);
- } else {
- RaftService.Client client = null;
- try {
- client = syncClientFactory.getSyncClient(clusterNode, this);
- } catch (TTransportException e) {
- // TODO throw me is better.
- if (ClusterIoTDB.printClientConnectionErrorStack) {
- logger.error(
- "Cannot open transport for client {}", syncClientFactory.nodeInfo(node), e);
- } else {
- logger.error("Cannot open transport for client {}", syncClientFactory.nodeInfo(node));
- }
- return null;
- }
- nodeClientNumMap.compute(
- clusterNode,
- (n, oldValue) -> {
- if (oldValue == null) return 1;
- return oldValue + 1;
- });
- return client;
- }
- } else {
- return clientStack.pop();
- }
- }
- }
-
- @SuppressWarnings("squid:S2273") // synchronized outside
- private RaftService.Client waitForClient(
- Deque<RaftService.Client> clientStack, ClusterNode clusterNode) {
- // wait for an available client
- long waitStart = System.currentTimeMillis();
- while (clientStack.isEmpty()) {
- try {
- clientStack.wait(waitClientTimeoutMS);
- if (clientStack.isEmpty()
- && System.currentTimeMillis() - waitStart >= waitClientTimeoutMS) {
- logger.warn(
- "Cannot get an available client after {}ms, create a new one", waitClientTimeoutMS);
- RaftService.Client client = syncClientFactory.getSyncClient(clusterNode, this);
- nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue + 1);
- return client;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn(
- "Interrupted when waiting for an available client of {}",
- syncClientFactory.nodeInfo(clusterNode));
- return null;
- } catch (TTransportException e) {
- if (ClusterIoTDB.printClientConnectionErrorStack) {
- logger.error(
- "Cannot open transport for client {}", syncClientFactory.nodeInfo(clusterNode), e);
- } else {
- logger.error(
- "Cannot open transport for client {}", syncClientFactory.nodeInfo(clusterNode));
- }
- return null;
- }
- }
- return clientStack.pop();
- }
-
- /**
- * Return a client of a node to the pool. Closed client should not be returned.
- *
- * @param node connection node
- * @param client push client to pool
- */
- public void putClient(Node node, RaftService.Client client) {
- ClusterNode clusterNode = new ClusterNode(node);
- // As clientCaches is ConcurrentHashMap, computeIfAbsent is thread safety.
- Deque<RaftService.Client> clientStack =
- clientCaches.computeIfAbsent(clusterNode, n -> new ArrayDeque<>());
- synchronized (clientStack) {
- if (client.getInputProtocol() != null && client.getInputProtocol().getTransport().isOpen()) {
- clientStack.push(client);
- NodeStatusManager.getINSTANCE().activate(node);
- } else {
- try {
- clientStack.push(syncClientFactory.getSyncClient(node, this));
- NodeStatusManager.getINSTANCE().activate(node);
- } catch (TTransportException e) {
- if (ClusterIoTDB.printClientConnectionErrorStack) {
- logger.error(
- "Cannot open transport for client {}", syncClientFactory.nodeInfo(node), e);
- } else {
- logger.error("Cannot open transport for client {}", syncClientFactory.nodeInfo(node));
- }
- nodeClientNumMap.computeIfPresent(clusterNode, (n, oldValue) -> oldValue - 1);
- NodeStatusManager.getINSTANCE().deactivate(node);
- }
- }
- clientStack.notifyAll();
- }
- }
-
- @TestOnly
- public Map<ClusterNode, Integer> getNodeClientNumMap() {
- return nodeClientNumMap;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index aeb40cf..1939b8a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -19,64 +19,134 @@
package org.apache.iotdb.cluster.client.sync;
+import org.apache.iotdb.cluster.client.BaseFactory;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import java.net.SocketException;
+
/**
* Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
*/
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncDataClient extends TSDataServiceClient {
+// TODO: Refine the interfaces of TSDataService. TSDataService interfaces doesn't need extends
+// TODO: RaftService interfaces
+public class SyncDataClient extends TSDataService.Client {
+
+ private Node node;
+ private ClientCategory category;
+ private IClientManager clientManager;
- /** @param prot this constructor just create a new instance, but do not open the connection */
@TestOnly
public SyncDataClient(TProtocol prot) {
super(prot);
}
- SyncDataClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+ public SyncDataClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
throws TTransportException {
+
+ // the difference of the two clients lies in the port
super(
- protocolFactory,
- target.getInternalIp(),
- target.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS(),
- target,
- pool);
+ protocolFactory.getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ node.getInternalIp(),
+ ClientUtils.getPort(node, category),
+ ClusterConstant.getConnectionTimeoutInMS()))));
+ this.node = node;
+ this.category = category;
+ getInputProtocol().getTransport().open();
+ }
+
+ public SyncDataClient(
+ TProtocolFactory protocolFactory, Node node, ClientCategory category, IClientManager manager)
+ throws TTransportException {
+ this(protocolFactory, node, category);
+ this.clientManager = manager;
+ }
+
+ public void returnSelf() {
+ if (clientManager != null) clientManager.returnSyncClient(this, node, category);
+ }
+
+ public void setTimeout(int timeout) {
+ // the same transport is used in both input and output
+ ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
+ public void close() {
+ getInputProtocol().getTransport().close();
+ }
+
+ @TestOnly
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
@Override
public String toString() {
- return String.format(
- "SyncDataClient (ip = %s, port = %d, id = %d)",
- target.getInternalIp(), target.getDataPort(), target.getNodeIdentifier());
+ return "Sync"
+ + category.getName()
+ + "{"
+ + "node="
+ + node
+ + ","
+ + "port="
+ + ClientUtils.getPort(node, category)
+ + '}';
}
- public static class Factory implements SyncClientFactory {
+ public Node getNode() {
+ return node;
+ }
+
+ public static class SyncDataClientFactory extends BaseFactory<Node, SyncDataClient> {
+
+ public SyncDataClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ super(protocolFactory, category);
+ }
+
+ public SyncDataClientFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ super(protocolFactory, category, clientManager);
+ }
- private TProtocolFactory protocolFactory;
+ @Override
+ public void activateObject(Node node, PooledObject<SyncDataClient> pooledObject) {
+ pooledObject.getObject().setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ }
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
+ @Override
+ public void destroyObject(Node node, PooledObject<SyncDataClient> pooledObject) {
+ pooledObject.getObject().close();
}
@Override
- public SyncDataClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException {
- return new SyncDataClient(protocolFactory, node, pool);
+ public PooledObject<SyncDataClient> makeObject(Node node) throws Exception {
+ return new DefaultPooledObject<>(
+ new SyncDataClient(protocolFactory, node, category, clientPoolManager));
}
@Override
- public String nodeInfo(Node node) {
- return String.format(
- "DataNode (ip = %s, port = %d, id = %d)",
- node.getInternalIp(), node.getDataPort(), node.getNodeIdentifier());
+ public boolean validateObject(Node node, PooledObject<SyncDataClient> pooledObject) {
+ return pooledObject.getObject() != null
+ && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
deleted file mode 100644
index 0629f9b..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
-
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-public class SyncDataHeartbeatClient extends TSDataServiceClient {
-
- private SyncDataHeartbeatClient(
- TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
- throws TTransportException {
- // the difference of the two clients lies in the port
- super(
- protocolFactory,
- target.getInternalIp(),
- target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getHeartbeatClientConnTimeoutMs(),
- target,
- pool);
- }
-
- @Override
- public String toString() {
- return String.format(
- "SyncDataHBClient (ip = %s, port = %d, id = %d)",
- target.getInternalIp(),
- target.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- target.getNodeIdentifier());
- }
-
- public static class Factory implements SyncClientFactory {
-
- private TProtocolFactory protocolFactory;
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public SyncDataHeartbeatClient getSyncClient(Node node, SyncClientPool pool)
- throws TTransportException {
- return new SyncDataHeartbeatClient(protocolFactory, node, pool);
- }
-
- @Override
- public String nodeInfo(Node node) {
- return String.format(
- "DataHBNode (ip = %s, port = %d, id = %d)",
- node.getInternalIp(),
- node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- node.getNodeIdentifier());
- }
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 7060083..62f031c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -19,64 +19,124 @@
package org.apache.iotdb.cluster.client.sync;
+import org.apache.iotdb.cluster.client.BaseFactory;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TConfigurationConst;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-import org.apache.thrift.protocol.TProtocol;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
+import java.net.SocketException;
+
/**
* Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
+ * should not cache it anywhere else.
*/
-// the two classes does not share a common parent and Java does not allow multiple extension
-@SuppressWarnings("common-java:DuplicatedBlocks")
-public class SyncMetaClient extends TSMetaServiceClient {
+public class SyncMetaClient extends TSMetaService.Client {
- /** @param prot this constructor just create a new instance, but do not open the connection */
- @TestOnly
- SyncMetaClient(TProtocol prot) {
- super(prot);
- }
+ private Node node;
+ private ClientCategory category;
+ private IClientManager clientManager;
- private SyncMetaClient(TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
+ public SyncMetaClient(TProtocolFactory protocolFactory, Node node, ClientCategory category)
throws TTransportException {
super(
- protocolFactory,
- target.getInternalIp(),
- target.getMetaPort(),
- ClusterConstant.getConnectionTimeoutInMS(),
- target,
- pool);
+ protocolFactory.getProtocol(
+ RpcTransportFactory.INSTANCE.getTransport(
+ new TSocket(
+ TConfigurationConst.defaultTConfiguration,
+ node.getInternalIp(),
+ ClientUtils.getPort(node, category),
+ ClusterConstant.getConnectionTimeoutInMS()))));
+ this.node = node;
+ this.category = category;
+ getInputProtocol().getTransport().open();
+ }
+
+ public SyncMetaClient(
+ TProtocolFactory protocolFactory, Node node, ClientCategory category, IClientManager manager)
+ throws TTransportException {
+ this(protocolFactory, node, category);
+ this.clientManager = manager;
+ }
+
+ public void returnSelf() {
+ if (clientManager != null) clientManager.returnSyncClient(this, node, category);
+ }
+
+ public void setTimeout(int timeout) {
+ // the same transport is used in both input and output
+ ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
+ }
+
+ @TestOnly
+ public int getTimeout() throws SocketException {
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
+ }
+
+ public void close() {
+ getInputProtocol().getTransport().close();
+ }
+
+ public Node getNode() {
+ return node;
}
@Override
public String toString() {
- return String.format(
- "SyncMetaClient (ip = %s, port = %d, id = %d)",
- target.getInternalIp(), target.getMetaPort(), target.getNodeIdentifier());
+ return "Sync"
+ + category.getName()
+ + "{"
+ + "node="
+ + node
+ + ","
+ + "port="
+ + ClientUtils.getPort(node, category)
+ + '}';
}
- public static class Factory implements SyncClientFactory {
+ public static class SyncMetaClientFactory extends BaseFactory<Node, SyncMetaClient> {
+
+ public SyncMetaClientFactory(TProtocolFactory protocolFactory, ClientCategory category) {
+ super(protocolFactory, category);
+ }
+
+ public SyncMetaClientFactory(
+ TProtocolFactory protocolFactory, ClientCategory category, IClientManager clientManager) {
+ super(protocolFactory, category, clientManager);
+ }
- private TProtocolFactory protocolFactory;
+ @Override
+ public void activateObject(Node node, PooledObject<SyncMetaClient> pooledObject) {
+ pooledObject.getObject().setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ }
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
+ @Override
+ public void destroyObject(Node node, PooledObject<SyncMetaClient> pooledObject) {
+ pooledObject.getObject().close();
}
@Override
- public SyncMetaClient getSyncClient(Node node, SyncClientPool pool) throws TTransportException {
- return new SyncMetaClient(protocolFactory, node, pool);
+ public PooledObject<SyncMetaClient> makeObject(Node node) throws Exception {
+ return new DefaultPooledObject<>(
+ new SyncMetaClient(protocolFactory, node, category, clientPoolManager));
}
@Override
- public String nodeInfo(Node node) {
- return String.format(
- "MetaNode (ip = %s, port = %d, id = %d)",
- node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier());
+ public boolean validateObject(Node node, PooledObject<SyncMetaClient> pooledObject) {
+ return pooledObject.getObject() != null
+ && pooledObject.getObject().getInputProtocol().getTransport().isOpen();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
deleted file mode 100644
index 4d9514c..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.config.ClusterConstant;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClusterUtils;
-
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransportException;
-
-/**
- * Notice: Because a client will be returned to a pool immediately after a successful request, you
- * should not cache it anywhere else or there may be conflicts.
- */
-public class SyncMetaHeartbeatClient extends TSMetaServiceClient {
-
- private SyncMetaHeartbeatClient(
- TProtocolFactory protocolFactory, Node target, SyncClientPool pool)
- throws TTransportException {
- super(
- protocolFactory,
- target.getInternalIp(),
- target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getHeartbeatClientConnTimeoutMs(),
- target,
- pool);
- }
-
- @Override
- public String toString() {
- return String.format(
- "SyncMetaHBClient (ip = %s, port = %d, id = %d)",
- target.getInternalIp(),
- target.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
- target.getNodeIdentifier());
- }
-
- public static class Factory implements SyncClientFactory {
-
- private TProtocolFactory protocolFactory;
-
- public Factory(TProtocolFactory protocolFactory) {
- this.protocolFactory = protocolFactory;
- }
-
- @Override
- public SyncMetaHeartbeatClient getSyncClient(Node node, SyncClientPool pool)
- throws TTransportException {
- return new SyncMetaHeartbeatClient(protocolFactory, node, pool);
- }
-
- @Override
- public String nodeInfo(Node node) {
- return String.format(
- "MetaHBNode (ip = %s, port = %d, id = %d)",
- node.getInternalIp(), node.getMetaPort(), node.getNodeIdentifier());
- }
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java
deleted file mode 100644
index 3102ebd..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSDataServiceClient.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSDataService;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.Closeable;
-import java.net.SocketException;
-
-public abstract class TSDataServiceClient extends TSDataService.Client implements Closeable {
-
- Node target;
- SyncClientPool pool;
-
- /** @param prot this constructor just create a new instance, but do not open the connection */
- @TestOnly
- TSDataServiceClient(TProtocol prot) {
- super(prot);
- }
-
- /**
- * cerate a new client and open the connection
- *
- * @param protocolFactory
- * @param ip
- * @param port
- * @param timeoutInMS
- * @param target
- * @param pool
- * @throws TTransportException
- */
- public TSDataServiceClient(
- TProtocolFactory protocolFactory,
- String ip,
- int port,
- int timeoutInMS,
- Node target,
- SyncClientPool pool)
- throws TTransportException {
-
- // the difference of the two clients lies in the port
- super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(ip, port, timeoutInMS)));
- this.target = target;
- this.pool = pool;
- getInputProtocol().getTransport().open();
- }
-
- public void setTimeout(int timeout) {
- // the same transport is used in both input and output
- ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
- }
-
- @TestOnly
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
- }
-
- /**
- * if the client does not open the connection, remove it.
- *
- * <p>if the client's connection is closed, create a new one.
- *
- * <p>if the client's connection is fine, put it back to the pool
- */
- public void putBack() {
- if (pool != null) {
- pool.putClient(target, this);
- } else {
- TProtocol inputProtocol = getInputProtocol();
- if (inputProtocol != null) {
- inputProtocol.getTransport().close();
- }
- }
- }
-
- /** put the client to pool, instead of close client. */
- @Override
- public void close() {
- putBack();
- }
-
- public Node getTarget() {
- return target;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java
deleted file mode 100644
index 6a3b0dd..0000000
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/TSMetaServiceClient.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.TSMetaService;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransportException;
-
-import java.io.Closeable;
-import java.net.SocketException;
-
-public class TSMetaServiceClient extends TSMetaService.Client implements Closeable {
- Node target;
- SyncClientPool pool;
-
- /** @param prot this constructor just create a new instance, but do not open the connection */
- @TestOnly
- public TSMetaServiceClient(TProtocol prot) {
- super(prot);
- }
- /**
- * cerate a new client and open the connection
- *
- * @param protocolFactory
- * @param ip
- * @param port
- * @param timeoutInMS
- * @param target
- * @param pool
- * @throws TTransportException
- */
- public TSMetaServiceClient(
- TProtocolFactory protocolFactory,
- String ip,
- int port,
- int timeoutInMS,
- Node target,
- SyncClientPool pool)
- throws TTransportException {
-
- // the difference of the two clients lies in the port
- super(
- protocolFactory.getProtocol(
- RpcTransportFactory.INSTANCE.getTransport(ip, port, timeoutInMS)));
- this.target = target;
- this.pool = pool;
- getInputProtocol().getTransport().open();
- }
-
- public void setTimeout(int timeout) {
- // the same transport is used in both input and output
- ((TimeoutChangeableTransport) (getInputProtocol().getTransport())).setTimeout(timeout);
- }
-
- @TestOnly
- public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
- }
-
- /**
- * if the client does not open the connection, remove it.
- *
- * <p>if the client's connection is closed, create a new one.
- *
- * <p>if the client's connection is fine, put it back to the pool
- */
- public void putBack() {
- if (pool != null) {
- pool.putClient(target, this);
- } else {
- TProtocol inputProtocol = getInputProtocol();
- if (inputProtocol != null) {
- inputProtocol.getTransport().close();
- }
- }
- }
-
- /** put the client to pool, instead of close client. */
- @Override
- public void close() {
- putBack();
- }
-
- public Node getTarget() {
- return target;
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 9cdfb32..57f7cfc 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -34,7 +34,6 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.ClusterPlanRouter;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.utils.PartitionUtils;
@@ -68,7 +67,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -742,7 +740,7 @@ public class Coordinator {
} else {
status = forwardDataPlanSync(plan, node, group.getHeader());
}
- } catch (IOException e) {
+ } catch (Exception e) {
status = StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, e.getMessage());
}
if (!StatusUtils.TIME_OUT.equals(status)) {
@@ -766,49 +764,31 @@ public class Coordinator {
* @return a TSStatus indicating if the forwarding is successful.
*/
private TSStatus forwardDataPlanAsync(PhysicalPlan plan, Node receiver, RaftNode header)
- throws IOException {
- RaftService.AsyncClient client =
+ throws Exception {
+ AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
return this.metaGroupMember.forwardPlanAsync(plan, receiver, header, client);
}
private TSStatus forwardDataPlanSync(PhysicalPlan plan, Node receiver, RaftNode header)
- throws IOException {
- RaftService.Client client;
+ throws Exception {
+ SyncDataClient client = null;
try {
client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getSyncDataClient(receiver, ClusterConstant.getWriteOperationTimeoutMS());
+
+ return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
} catch (TException e) {
- throw new IOException(e);
+ client.close();
+ throw e;
+ } finally {
+ if (client != null) client.returnSelf();
}
- return this.metaGroupMember.forwardPlanSync(plan, receiver, header, client);
- }
-
- /**
- * Get a thrift client that will connect to "node" using the data port.
- *
- * @param node the node to be connected
- * @param timeout timeout threshold of connection
- */
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- return ClusterIoTDB.getInstance().getClientProvider().getAsyncDataClient(node, timeout);
}
public Node getThisNode() {
return thisNode;
}
-
- /**
- * Get a thrift client that will connect to "node" using the data port.
- *
- * @param node the node to be connected
- * @param timeout timeout threshold of connection
- */
- public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
- return ClusterIoTDB.getInstance().getClientProvider().getSyncDataClient(node, timeout);
- }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
index a67cd50..de1f8c8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/FileSnapshot.java
@@ -33,7 +33,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.modification.ModificationFile;
@@ -350,13 +349,13 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
try {
client.removeHardLink(resource.getTsFile().getAbsolutePath());
} catch (TException te) {
- client.getInputProtocol().getTransport().close();
+ client.close();
logger.error(
"Cannot remove hardlink {} from {}",
resource.getTsFile().getAbsolutePath(),
sourceNode);
} finally {
- ClientUtils.putBackSyncClient(client);
+ if (client != null) client.returnSelf();
}
}
}
@@ -530,7 +529,8 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
if (client == null) {
throw new IOException("No available client for " + node.toString());
}
- ByteBuffer buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
+ ByteBuffer buffer;
+ buffer = SyncClientAdaptor.readFile(client, remotePath, offset, fetchSize);
int len = writeBuffer(buffer, dest);
if (len == 0) {
break;
@@ -576,9 +576,9 @@ public class FileSnapshot extends Snapshot implements TimeseriesSchemaSnapshot {
offset += len;
}
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
dest.flush();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
index daa813d..50a5d99 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTask.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotRequest;
import org.apache.iotdb.cluster.rpc.thrift.PullSnapshotResp;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -163,10 +162,10 @@ public class PullSnapshotTask<T extends Snapshot> implements Callable<Void> {
try {
pullSnapshotResp = client.pullSnapshot(request);
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
throw e;
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
result = new HashMap<>();
for (Entry<Integer, ByteBuffer> integerByteBufferEntry :
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 96b9f13..07e0cf3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -890,52 +890,52 @@ public class CMManager extends MManager {
private List<String> getUnregisteredSeriesListRemotely(
List<String> seriesList, PartitionGroup partitionGroup) {
for (Node node : partitionGroup) {
+ List<String> result = null;
try {
- List<String> result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
result =
SyncClientAdaptor.getUnregisteredMeasurements(
client, partitionGroup.getHeader(), seriesList);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- result =
- syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ result =
+ syncDataClient.getUnregisteredTimeseries(partitionGroup.getHeader(), seriesList);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
- if (result != null) {
- return result;
- }
- } catch (TException | IOException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
logger.error(
- "{}: cannot getting unregistered {} and other {} paths from {}",
+ "{}: getting unregistered series list {} ... {} is interrupted from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (Exception e) {
logger.error(
- "{}: getting unregistered series list {} ... {} is interrupted from {}",
+ "{}: cannot getting unregistered {} and other {} paths from {}",
metaGroupMember.getName(),
seriesList.get(0),
seriesList.get(seriesList.size() - 1),
node,
e);
}
+ if (result != null) {
+ return result;
+ }
}
return Collections.emptyList();
}
@@ -1043,11 +1043,11 @@ public class CMManager extends MManager {
// a non-null result contains correct result even if it is empty, so query next group
return paths;
}
- } catch (IOException | TException e) {
- throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
@@ -1056,27 +1056,26 @@ public class CMManager extends MManager {
@SuppressWarnings("java:S1168") // null and empty list are different
private List<PartialPath> getMatchedPaths(
- Node node, RaftNode header, List<String> pathsToQuery, boolean withAlias)
- throws IOException, TException, InterruptedException {
+ Node node, RaftNode header, List<String> pathsToQuery, boolean withAlias) throws Exception {
GetAllPathsResult result;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
result = SyncClientAdaptor.getAllPaths(client, header, pathsToQuery, withAlias);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ result = syncDataClient.getAllPaths(header, pathsToQuery, withAlias);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
@@ -1178,11 +1177,11 @@ public class CMManager extends MManager {
}
return partialPaths;
}
- } catch (IOException | TException e) {
- throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
@@ -1190,26 +1189,28 @@ public class CMManager extends MManager {
}
private Set<String> getMatchedDevices(Node node, RaftNode header, List<String> pathsToQuery)
- throws IOException, TException, InterruptedException {
+ throws Exception {
Set<String> paths;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
paths = SyncClientAdaptor.getAllDevices(client, header, pathsToQuery);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
try {
paths = syncDataClient.getAllDevices(header, pathsToQuery);
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
+ syncDataClient.close();
throw e;
}
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
return paths;
@@ -1610,6 +1611,8 @@ public class CMManager extends MManager {
} catch (InterruptedException e) {
logger.error("Interrupted when getting timeseries schemas in node {}.", node, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Error occurs when getting timeseries schemas in node {}.", node, e);
}
}
@@ -1641,6 +1644,8 @@ public class CMManager extends MManager {
} catch (InterruptedException e) {
logger.error("Interrupted when getting devices schemas in node {}.", node, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Error occurs when getting devices schemas in node {}.", node, e);
}
}
@@ -1656,22 +1661,21 @@ public class CMManager extends MManager {
}
private ByteBuffer showRemoteTimeseries(Node node, PartitionGroup group, ShowTimeSeriesPlan plan)
- throws IOException, TException, InterruptedException {
+ throws Exception {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getAllMeasurementSchema(client, group.getHeader(), plan);
} else {
+ SyncDataClient syncDataClient = null;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
try {
plan.serialize(dataOutputStream);
resultBinary =
@@ -1679,40 +1683,41 @@ public class CMManager extends MManager {
group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
+ syncDataClient.close();
throw e;
}
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
return resultBinary;
}
private ByteBuffer getRemoteDevices(Node node, PartitionGroup group, ShowDevicesPlan plan)
- throws IOException, TException, InterruptedException {
+ throws Exception {
ByteBuffer resultBinary;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
resultBinary = SyncClientAdaptor.getDevices(client, group.getHeader(), plan);
} else {
+ SyncDataClient syncDataClient = null;
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
- DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
- SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- plan.serialize(dataOutputStream);
- resultBinary =
- syncDataClient.getDevices(
- group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ plan.serialize(dataOutputStream);
+ resultBinary =
+ syncDataClient.getDevices(
+ group.getHeader(), ByteBuffer.wrap(byteArrayOutputStream.toByteArray()));
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
return resultBinary;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
index 3e9d4e5..78d516e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/MetaPuller.java
@@ -48,7 +48,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -187,7 +186,8 @@ public class MetaPuller {
List<IMeasurementSchema> schemas = null;
try {
schemas = pullMeasurementSchemas(node, request);
- } catch (IOException | TException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
@@ -195,8 +195,7 @@ public class MetaPuller {
request.getPrefixPaths().size() - 1,
node,
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (Exception e) {
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
@@ -224,36 +223,36 @@ public class MetaPuller {
}
private List<IMeasurementSchema> pullMeasurementSchemas(Node node, PullSchemaRequest request)
- throws TException, InterruptedException, IOException {
+ throws Exception {
List<IMeasurementSchema> schemas;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
schemas = SyncClientAdaptor.pullMeasurementSchema(client, request);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- // only need measurement name
- PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
- ByteBuffer buffer = pullSchemaResp.schemaBytes;
- int size = buffer.getInt();
- schemas = new ArrayList<>(size);
- for (int i = 0; i < size; i++) {
- schemas.add(
- buffer.get() == 0
- ? MeasurementSchema.partialDeserializeFrom(buffer)
- : VectorMeasurementSchema.partialDeserializeFrom(buffer));
- }
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ // only need measurement name
+ PullSchemaResp pullSchemaResp = syncDataClient.pullMeasurementSchema(request);
+ ByteBuffer buffer = pullSchemaResp.schemaBytes;
+ int size = buffer.getInt();
+ schemas = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ schemas.add(
+ buffer.get() == 0
+ ? MeasurementSchema.partialDeserializeFrom(buffer)
+ : VectorMeasurementSchema.partialDeserializeFrom(buffer));
}
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
@@ -364,7 +363,8 @@ public class MetaPuller {
List<TimeseriesSchema> schemas = null;
try {
schemas = pullTimeSeriesSchemas(node, request);
- } catch (IOException | TException e) {
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
@@ -372,8 +372,7 @@ public class MetaPuller {
request.getPrefixPaths().size() - 1,
node,
e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ } catch (Exception e) {
logger.error(
"{}: Cannot pull timeseries schemas of {} and other {} paths from {}",
metaGroupMember.getName(),
@@ -411,20 +410,19 @@ public class MetaPuller {
* null if there was a timeout.
*/
private List<TimeseriesSchema> pullTimeSeriesSchemas(Node node, PullSchemaRequest request)
- throws TException, InterruptedException, IOException {
+ throws Exception {
List<TimeseriesSchema> schemas;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
schemas = SyncClientAdaptor.pullTimeseriesSchema(client, request);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
-
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
PullSchemaResp pullSchemaResp = syncDataClient.pullTimeSeriesSchema(request);
ByteBuffer buffer = pullSchemaResp.schemaBytes;
int size = buffer.getInt();
@@ -432,6 +430,11 @@ public class MetaPuller {
for (int i = 0; i < size; i++) {
schemas.add(TimeseriesSchema.deserializeFrom(buffer));
}
+ } catch (TException e) {
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
index 5ac17be..6239003 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterPlanExecutor.java
@@ -247,25 +247,25 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
client.setTimeout(ClusterConstant.getReadOperationTimeoutMS());
count =
SyncClientAdaptor.getPathCount(
client, partitionGroup.getHeader(), pathsToQuery, level);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- syncDataClient.setTimeout(ClusterConstant.getReadOperationTimeoutMS());
- count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ syncDataClient.setTimeout(ClusterConstant.getReadOperationTimeoutMS());
+ count = syncDataClient.getPathCount(partitionGroup.getHeader(), pathsToQuery, level);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
logger.debug(
@@ -277,11 +277,11 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (count != null) {
return count;
}
- } catch (IOException | TException e) {
- throw new MetadataException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MetadataException(e);
+ } catch (Exception e) {
+ throw new MetadataException(e);
}
}
logger.warn("Cannot get paths of {} from {}", pathsToQuery, partitionGroup);
@@ -363,36 +363,34 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
paths =
SyncClientAdaptor.getNodeList(
client, group.getHeader(), schemaPattern.getFullPath(), level);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- paths =
- syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ paths =
+ syncDataClient.getNodeList(group.getHeader(), schemaPattern.getFullPath(), level);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
if (paths != null) {
break;
}
- } catch (IOException e) {
- logger.error(LOG_FAIL_CONNECT, node, e);
- } catch (TException e) {
- logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Error occurs when getting node lists in node {}.", node, e);
}
}
return PartialPath.fromStringList(paths);
@@ -458,36 +456,34 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
nextChildrenNodes =
SyncClientAdaptor.getChildNodeInNextLevel(
client, group.getHeader(), path.getFullPath());
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- nextChildrenNodes =
- syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ nextChildrenNodes =
+ syncDataClient.getChildNodeInNextLevel(group.getHeader(), path.getFullPath());
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
if (nextChildrenNodes != null) {
break;
}
- } catch (IOException e) {
- logger.error(LOG_FAIL_CONNECT, node, e);
- } catch (TException e) {
- logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Error occurs when getting node lists in node {}.", node, e);
}
}
return nextChildrenNodes;
@@ -576,35 +572,33 @@ public class ClusterPlanExecutor extends PlanExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
nextChildren =
SyncClientAdaptor.getNextChildren(client, group.getHeader(), path.getFullPath());
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- nextChildren =
- syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ nextChildren =
+ syncDataClient.getChildNodePathInNextLevel(group.getHeader(), path.getFullPath());
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
if (nextChildren != null) {
break;
}
- } catch (IOException e) {
- logger.error(LOG_FAIL_CONNECT, node, e);
- } catch (TException e) {
- logger.error("Error occurs when getting node lists in node {}.", node, e);
} catch (InterruptedException e) {
logger.error("Interrupted when getting node lists in node {}.", node, e);
Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ logger.error("Error occurs when getting node lists in node {}.", node, e);
}
}
return nextChildren;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
index a24852d..a2b4da9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/aggregate/ClusterAggregator.java
@@ -243,12 +243,12 @@ public class ClusterAggregator {
results);
return results;
}
- } catch (TException | IOException e) {
- logger.error(
- "{}: Cannot query aggregation {} from {}", metaGroupMember.getName(), path, node, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("{}: query {} interrupted from {}", metaGroupMember.getName(), path, node, e);
+ } catch (Exception e) {
+ logger.error(
+ "{}: Cannot query aggregation {} from {}", metaGroupMember.getName(), path, node, e);
}
}
throw new StorageEngineException(
@@ -256,27 +256,26 @@ public class ClusterAggregator {
}
private List<ByteBuffer> getRemoteAggregateResult(Node node, GetAggrResultRequest request)
- throws IOException, TException, InterruptedException {
- List<ByteBuffer> resultBuffers;
+ throws Exception {
+ List<ByteBuffer> resultBuffers = null;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
// each buffer is an AggregationResult
resultBuffers = SyncClientAdaptor.getAggrResult(client, request);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- resultBuffers = syncDataClient.getAggrResult(request);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
return resultBuffers;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
index 933d145..bd72887 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/fill/ClusterPreviousFill.java
@@ -219,18 +219,10 @@ public class ClusterPreviousFill extends PreviousFill {
private ByteBuffer remoteAsyncPreviousFill(
Node node, PreviousFillRequest request, PreviousFillArguments arguments) {
ByteBuffer byteBuffer = null;
- AsyncDataClient asyncDataClient;
try {
- asyncDataClient =
+ AsyncDataClient asyncDataClient =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- } catch (IOException e) {
- logger.warn("{}: Cannot connect to {} during previous fill", metaGroupMember, node);
- return null;
- }
-
- try {
byteBuffer = SyncClientAdaptor.previousFill(asyncDataClient, request);
} catch (Exception e) {
logger.error(
@@ -246,17 +238,22 @@ public class ClusterPreviousFill extends PreviousFill {
private ByteBuffer remoteSyncPreviousFill(
Node node, PreviousFillRequest request, PreviousFillArguments arguments) {
ByteBuffer byteBuffer = null;
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- byteBuffer = syncDataClient.previousFill(request);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ byteBuffer = syncDataClient.previousFill(request);
+
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ logger.error(
+ "{}: Cannot perform previous fill of {} to {}",
+ metaGroupMember.getName(),
+ arguments.getPath(),
+ node,
+ e);
} catch (Exception e) {
logger.error(
"{}: Cannot perform previous fill of {} to {}",
@@ -264,6 +261,8 @@ public class ClusterPreviousFill extends PreviousFill {
arguments.getPath(),
node,
e);
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
return byteBuffer;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
index 4a69231..c49359c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutor.java
@@ -80,31 +80,31 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS());
aggrBuffers =
SyncClientAdaptor.getGroupByResult(
client, header, executorId, curStartTime, curEndTime);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- aggrBuffers =
- syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS());
+ aggrBuffers =
+ syncDataClient.getGroupByResult(header, executorId, curStartTime, curEndTime);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
- } catch (TException e) {
- throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
+ } catch (Exception e) {
+ throw new IOException(e);
}
resetAggregateResults();
if (aggrBuffers != null) {
@@ -130,31 +130,31 @@ public class RemoteGroupByExecutor implements GroupByExecutor {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS());
aggrBuffer =
SyncClientAdaptor.peekNextNotNullValue(
client, header, executorId, nextStartTime, nextEndTime);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- aggrBuffer =
- syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(source, ClusterConstant.getReadOperationTimeoutMS());
+ aggrBuffer =
+ syncDataClient.peekNextNotNullValue(header, executorId, nextStartTime, nextEndTime);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
- } catch (TException e) {
- throw new IOException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e);
+ } catch (Exception e) {
+ throw new IOException(e);
}
Pair<Long, Object> result = null;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index d22aeda..efb7dff 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -225,30 +225,27 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
results.add(new Pair<>(true, pair));
}
return results;
- } catch (TException e) {
- logger.warn("Query last of {} from {} errored", group, seriesPaths, e);
- return Collections.emptyList();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Query last of {} from {} interrupted", group, seriesPaths, e);
return Collections.emptyList();
+ } catch (Exception e) {
+ logger.warn("Query last of {} from {} errored", group, seriesPaths, e);
+ return Collections.emptyList();
}
}
return Collections.emptyList();
}
- private ByteBuffer lastAsync(Node node, QueryContext context)
- throws TException, InterruptedException {
+ private ByteBuffer lastAsync(Node node, QueryContext context) throws Exception {
ByteBuffer buffer;
- AsyncDataClient asyncDataClient;
- try {
- asyncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- } catch (IOException e) {
+ AsyncDataClient asyncDataClient =
+ ClusterIoTDB.getInstance()
+ .getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ if (asyncDataClient == null) {
return null;
}
+
buffer =
SyncClientAdaptor.last(
asyncDataClient,
@@ -260,12 +257,13 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
return buffer;
}
- private ByteBuffer lastSync(Node node, QueryContext context) throws TException {
+ private ByteBuffer lastSync(Node node, QueryContext context) throws Exception {
ByteBuffer res;
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
try {
res =
syncDataClient.last(
@@ -275,12 +273,14 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
context.getQueryId(),
queryPlan.getDeviceToMeasurements(),
group.getHeader(),
- syncDataClient.getTarget()));
+ syncDataClient.getNode()));
} catch (TException e) {
// the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
+ syncDataClient.close();
throw e;
}
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
return res;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index ff744d7..9d4885a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -930,38 +930,37 @@ public class ClusterReaderFactory {
logger.debug("{}: no data for {} from {}", metaGroupMember.getName(), path, node);
return new EmptyReader();
}
- } catch (TException | IOException e) {
- logger.error("{}: Cannot query {} from {}", metaGroupMember.getName(), path, node, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("{}: Cannot query {} from {}", metaGroupMember.getName(), path, node, e);
+ } catch (Exception e) {
+ logger.error("{}: Cannot query {} from {}", metaGroupMember.getName(), path, node, e);
}
}
throw new StorageEngineException(
new RequestTimeOutException("Query " + path + " in " + partitionGroup));
}
- private Long getRemoteGroupByExecutorId(Node node, GroupByRequest request)
- throws IOException, TException, InterruptedException {
+ private Long getRemoteGroupByExecutorId(Node node, GroupByRequest request) throws Exception {
Long executorId;
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
executorId = SyncClientAdaptor.getGroupByExecutor(client, request);
} else {
- try (SyncDataClient syncDataClient =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- executorId = syncDataClient.getGroupByExecutor(request);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ executorId = syncDataClient.getGroupByExecutor(request);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
return executorId;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
index 1fc37f3..a4c8ced 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/DataSourceInfo.java
@@ -113,11 +113,11 @@ public class DataSourceInfo {
return false;
}
}
- } catch (TException | IOException e) {
- logger.error("Cannot query {} from {}", this.request.path, node, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } catch (Exception e) {
+ logger.error("Cannot query {} from {}", this.request.path, node, e);
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
@@ -131,8 +131,7 @@ public class DataSourceInfo {
return false;
}
- private Long getReaderId(Node node, boolean byTimestamp, long timestamp)
- throws TException, InterruptedException, IOException {
+ private Long getReaderId(Node node, boolean byTimestamp, long timestamp) throws Exception {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return applyForReaderIdAsync(node, byTimestamp, timestamp);
}
@@ -140,12 +139,11 @@ public class DataSourceInfo {
}
private Long applyForReaderIdAsync(Node node, boolean byTimestamp, long timestamp)
- throws TException, InterruptedException, IOException {
+ throws Exception {
+ Long newReaderId;
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
- Long newReaderId;
if (byTimestamp) {
newReaderId = SyncClientAdaptor.querySingleSeriesByTimestamp(client, request);
} else {
@@ -155,35 +153,35 @@ public class DataSourceInfo {
}
private Long applyForReaderIdSync(Node node, boolean byTimestamp, long timestamp)
- throws TException {
+ throws Exception {
Long newReaderId;
- try (SyncDataClient client =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
-
- try {
- if (byTimestamp) {
- newReaderId = client.querySingleSeriesByTimestamp(request);
+ SyncDataClient client = null;
+ try {
+ client =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
+ if (byTimestamp) {
+ newReaderId = client.querySingleSeriesByTimestamp(request);
+ } else {
+ Filter newFilter;
+ // add timestamp to as a timeFilter to skip the data which has been read
+ if (request.isSetTimeFilterBytes()) {
+ Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
+ newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
} else {
- Filter newFilter;
- // add timestamp to as a timeFilter to skip the data which has been read
- if (request.isSetTimeFilterBytes()) {
- Filter timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
- newFilter = new AndFilter(timeFilter, TimeFilter.gt(timestamp));
- } else {
- newFilter = TimeFilter.gt(timestamp);
- }
- request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
- newReaderId = client.querySingleSeries(request);
+ newFilter = TimeFilter.gt(timestamp);
}
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- client.getInputProtocol().getTransport().close();
- throw e;
+ request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
+ newReaderId = client.querySingleSeries(request);
}
return newReaderId;
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.close();
+ throw e;
+ } finally {
+ if (client != null) client.returnSelf();
}
}
@@ -203,18 +201,16 @@ public class DataSourceInfo {
return this.curSource;
}
- AsyncDataClient getCurAsyncClient(int timeout) throws IOException {
+ AsyncDataClient getCurAsyncClient(int timeout) throws Exception {
return isNoClient
? null
- : ClusterIoTDB.getInstance()
- .getClientProvider()
- .getAsyncDataClient(this.curSource, timeout);
+ : ClusterIoTDB.getInstance().getAsyncDataClient(this.curSource, timeout);
}
- SyncDataClient getCurSyncClient(int timeout) throws TException {
+ SyncDataClient getCurSyncClient(int timeout) throws Exception {
return isNoClient
? null
- : ClusterIoTDB.getInstance().getClientProvider().getSyncDataClient(this.curSource, timeout);
+ : ClusterIoTDB.getInstance().getSyncDataClient(this.curSource, timeout);
}
public boolean isNoData() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
index 4969982..fbb829a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestamp.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.query.reader.series.IReaderByTimestamp;
import org.apache.iotdb.db.utils.SerializeUtils;
@@ -91,6 +90,8 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
Thread.currentThread().interrupt();
logger.warn("Query {} interrupted", sourceInfo);
return null;
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
return fetchResult.get();
@@ -116,10 +117,10 @@ public class RemoteSeriesReaderByTimestamp implements IReaderByTimestamp {
return null;
}
return fetchResultSync(timestamps, length);
+ } catch (Exception e) {
+ throw new IOException(e);
} finally {
- if (curSyncClient != null) {
- ClientUtils.putBackSyncClient(curSyncClient);
- }
+ if (curSyncClient != null) curSyncClient.returnSelf();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
index f72e73b..7d627d5 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReader.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.db.utils.SerializeUtils;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.common.BatchData;
@@ -138,6 +137,8 @@ public class RemoteSimpleSeriesReader implements IPointReader {
Thread.currentThread().interrupt();
logger.warn("Query {} interrupted", sourceInfo);
return null;
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
return fetchResult.get();
@@ -157,10 +158,10 @@ public class RemoteSimpleSeriesReader implements IPointReader {
return null;
}
return fetchResultSync();
+ } catch (Exception e) {
+ throw new IOException(e);
} finally {
- if (curSyncClient != null) {
- ClientUtils.putBackSyncClient(curSyncClient);
- }
+ if (curSyncClient != null) curSyncClient.returnSelf();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
index 29450b7..61a3a2b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/MultDataSourceInfo.java
@@ -118,11 +118,11 @@ public class MultDataSourceInfo {
return false;
}
}
- } catch (TException | IOException e) {
- logger.error("Cannot query {} from {}", this.request.path, node, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Cannot query {} from {}", this.request.path, node, e);
+ } catch (Exception e) {
+ logger.error("Cannot query {} from {}", this.request.path, node, e);
}
nextNodePos = (nextNodePos + 1) % this.nodes.size();
if (nextNodePos == this.curPos) {
@@ -140,19 +140,16 @@ public class MultDataSourceInfo {
return partialPaths;
}
- private Long getReaderId(Node node, long timestamp)
- throws TException, InterruptedException, IOException {
+ private Long getReaderId(Node node, long timestamp) throws Exception {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
return applyForReaderIdAsync(node, timestamp);
}
return applyForReaderIdSync(node, timestamp);
}
- private Long applyForReaderIdAsync(Node node, long timestamp)
- throws TException, InterruptedException, IOException {
+ private Long applyForReaderIdAsync(Node node, long timestamp) throws Exception {
AsyncDataClient client =
ClusterIoTDB.getInstance()
- .getClientProvider()
.getAsyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
AtomicReference<Long> result = new AtomicReference<>();
GenericHandler<Long> handler = new GenericHandler<>(client.getNode(), result);
@@ -174,14 +171,14 @@ public class MultDataSourceInfo {
return result.get();
}
- private Long applyForReaderIdSync(Node node, long timestamp) throws TException {
+ private Long applyForReaderIdSync(Node node, long timestamp) throws Exception {
Long newReaderId;
- try (SyncDataClient client =
- ClusterIoTDB.getInstance()
- .getClientProvider()
- .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS())) {
-
+ SyncDataClient client = null;
+ try {
+ client =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(node, ClusterConstant.getReadOperationTimeoutMS());
Filter newFilter;
// add timestamp to as a timeFilter to skip the data which has been read
if (request.isSetTimeFilterBytes()) {
@@ -191,14 +188,14 @@ public class MultDataSourceInfo {
newFilter = TimeFilter.gt(timestamp);
}
request.setTimeFilterBytes(SerializeUtils.serializeFilter(newFilter));
- try {
- newReaderId = client.queryMultSeries(request);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- client.getInputProtocol().getTransport().close();
- throw e;
- }
+ newReaderId = client.queryMultSeries(request);
return newReaderId;
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ client.close();
+ throw e;
+ } finally {
+ if (client != null) client.returnSelf();
}
}
@@ -214,18 +211,16 @@ public class MultDataSourceInfo {
return partitionGroup.getHeader();
}
- AsyncDataClient getCurAsyncClient(int timeout) throws IOException {
+ AsyncDataClient getCurAsyncClient(int timeout) throws Exception {
return isNoClient
? null
- : ClusterIoTDB.getInstance()
- .getClientProvider()
- .getAsyncDataClient(this.curSource, timeout);
+ : ClusterIoTDB.getInstance().getAsyncDataClient(this.curSource, timeout);
}
- SyncDataClient getCurSyncClient(int timeout) throws TException {
+ SyncDataClient getCurSyncClient(int timeout) throws Exception {
return isNoClient
? null
- : ClusterIoTDB.getInstance().getClientProvider().getSyncDataClient(this.curSource, timeout);
+ : ClusterIoTDB.getInstance().getSyncDataClient(this.curSource, timeout);
}
public boolean isNoData() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
index b43ee85..897c6c0 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReader.java
@@ -175,26 +175,26 @@ public class RemoteMultSeriesReader extends AbstractMultPointReader {
} catch (TException | InterruptedException e) {
logger.error("Failed to fetch result async, connect to {}", sourceInfo, e);
return null;
+ } catch (Exception e) {
+ throw new IOException(e);
}
}
return fetchResult.get();
}
private Map<String, ByteBuffer> fetchResultSync(List<String> paths) throws IOException {
-
- try (SyncDataClient curSyncClient =
- sourceInfo.getCurSyncClient(ClusterConstant.getReadOperationTimeoutMS()); ) {
- try {
- return curSyncClient.fetchMultSeries(
- sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- curSyncClient.getInputProtocol().getTransport().close();
- throw e;
- }
+ SyncDataClient curSyncClient = null;
+ try {
+ curSyncClient = sourceInfo.getCurSyncClient(ClusterConstant.getReadOperationTimeoutMS());
+ return curSyncClient.fetchMultSeries(sourceInfo.getHeader(), sourceInfo.getReaderId(), paths);
} catch (TException e) {
+ curSyncClient.close();
logger.error("Failed to fetch result sync, connect to {}", sourceInfo, e);
return null;
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ if (curSyncClient != null) curSyncClient.returnSelf();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
index a0a204e..c4e3e5c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClusterTSServiceImpl.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.cluster.server;
+import org.apache.iotdb.cluster.ClusterIoTDB;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
@@ -46,7 +47,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -145,23 +145,28 @@ public class ClusterTSServiceImpl extends TSServiceImpl {
try {
if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
AsyncDataClient client =
- coordinator.getAsyncDataClient(
- queriedNode, ClusterConstant.getReadOperationTimeoutMS());
+ ClusterIoTDB.getInstance()
+ .getAsyncDataClient(queriedNode, ClusterConstant.getReadOperationTimeoutMS());
client.endQuery(header, coordinator.getThisNode(), queryId, handler);
} else {
- try (SyncDataClient syncDataClient =
- coordinator.getSyncDataClient(
- queriedNode, ClusterConstant.getReadOperationTimeoutMS())) {
- try {
- syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
- } catch (TException e) {
- // the connection may be broken, close it to avoid it being reused
- syncDataClient.getInputProtocol().getTransport().close();
- throw e;
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient =
+ ClusterIoTDB.getInstance()
+ .getSyncDataClient(
+ queriedNode, ClusterConstant.getReadOperationTimeoutMS());
+ syncDataClient.endQuery(header, coordinator.getThisNode(), queryId);
+ } catch (TException e) {
+ // the connection may be broken, close it to avoid it being reused
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) {
+ syncDataClient.returnSelf();
}
}
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
logger.error("Cannot end query {} in {}", queryId, queriedNode);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
index ed99a45..602ac44 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/PullSnapshotHintService.java
@@ -135,15 +135,25 @@ public class PullSnapshotHintService {
private boolean sendHintsAsync(Node receiver, PullSnapshotHint hint)
throws TException, InterruptedException {
AsyncDataClient asyncDataClient = (AsyncDataClient) member.getAsyncClient(receiver);
+ if (asyncDataClient == null) {
+ return false;
+ }
return SyncClientAdaptor.onSnapshotApplied(asyncDataClient, hint.getHeader(), hint.slots);
}
private boolean sendHintSync(Node receiver, PullSnapshotHint hint) throws TException {
- try (SyncDataClient syncDataClient = (SyncDataClient) member.getSyncClient(receiver)) {
+ SyncDataClient syncDataClient = null;
+ try {
+ syncDataClient = (SyncDataClient) member.getSyncClient(receiver);
if (syncDataClient == null) {
return false;
}
return syncDataClient.onSnapshotApplied(hint.getHeader(), hint.slots);
+ } catch (TException e) {
+ syncDataClient.close();
+ throw e;
+ } finally {
+ if (syncDataClient != null) syncDataClient.returnSelf();
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
index a455290..3babe9f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThread.java
@@ -32,7 +32,6 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.handlers.caller.ElectionHandler;
import org.apache.iotdb.cluster.server.handlers.caller.HeartbeatHandler;
import org.apache.iotdb.cluster.server.member.RaftMember;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
@@ -247,7 +246,7 @@ public class HeartbeatThread implements Runnable {
logger.warn(
memberName + ": Cannot send heart beat to node " + node.toString(), e);
} finally {
- ClientUtils.putBackSyncHeartbeatClient(client);
+ localMember.returnSyncClient(client);
}
}
});
@@ -432,7 +431,7 @@ public class HeartbeatThread implements Runnable {
} catch (Exception e) {
handler.onError(e);
} finally {
- ClientUtils.putBackSyncHeartbeatClient(client);
+ localMember.returnSyncClient(client);
}
}
});
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index dbad251..3e585f2 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -19,13 +19,8 @@
package org.apache.iotdb.cluster.server.member;
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.SingleManagerFactory;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
@@ -208,11 +203,9 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
+ "-raftId-"
+ nodes.getId()
+ "",
- new AsyncClientPool(new AsyncDataClient.Factory(factory)),
- new SyncClientPool(new SyncDataClient.Factory(factory)),
- new AsyncClientPool(new AsyncDataHeartbeatClient.Factory(factory)),
- new SyncClientPool(new SyncDataHeartbeatClient.Factory(factory)),
- new AsyncClientPool(new SingleManagerFactory(factory)));
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.DataGroupClient));
this.metaGroupMember = metaGroupMember;
allNodes = nodes;
mbeanName =
@@ -771,6 +764,11 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
}
@Override
+ ClientCategory getClientCategory() {
+ return ClientCategory.DATA;
+ }
+
+ @Override
public String getMBeanName() {
return mbeanName;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 74fb7da..ccfb219 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -20,13 +20,11 @@
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.coordinator.Coordinator;
@@ -72,7 +70,6 @@ import org.apache.iotdb.cluster.server.monitor.NodeReport.MetaMemberReport;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.service.DataGroupServiceImpls;
-import org.apache.iotdb.cluster.utils.ClientUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.cluster.utils.PartitionUtils;
import org.apache.iotdb.cluster.utils.StatusUtils;
@@ -232,10 +229,9 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) {
super(
"Meta",
- new AsyncClientPool(new AsyncMetaClient.Factory(factory)),
- new SyncClientPool(new SyncMetaClient.Factory(factory)),
- new AsyncClientPool(new AsyncMetaHeartbeatClient.Factory(factory)),
- new SyncClientPool(new SyncMetaHeartbeatClient.Factory(factory)));
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.MetaGroupClient));
allNodes = new PartitionGroup();
initPeerMap();
@@ -500,10 +496,10 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
try {
resp = client.addNode(thisNode, startUpStatus);
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
throw e;
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
}
@@ -802,20 +798,32 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
// no need to shake hands with yourself
continue;
}
- try {
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- AsyncMetaClient asyncClient = (AsyncMetaClient) getAsyncClient(node);
- if (asyncClient != null) {
+
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ AsyncMetaClient asyncClient = (AsyncMetaClient) getAsyncClient(node);
+ if (asyncClient != null) {
+ try {
asyncClient.handshake(thisNode, new GenericHandler<>(node, null));
+ } catch (TException e) {
+ logger.error("failed send handshake to node: {}", node, e);
}
} else {
- SyncMetaClient syncClient = (SyncMetaClient) getSyncClient(node);
- if (syncClient != null) {
+ logger.error("send handshake fail as get empty async client");
+ }
+ } else {
+ SyncMetaClient syncClient = (SyncMetaClient) getSyncClient(node);
+ if (syncClient != null) {
+ try {
syncClient.handshake(thisNode);
+ } catch (TException e) {
+ syncClient.close();
+ logger.error("failed send handshake to node: {}", node, e);
+ } finally {
+ syncClient.returnSelf();
}
+ } else {
+ logger.error("send handshake fail as get empty sync client");
}
- } catch (TException e) {
- // ignore handshake exceptions
}
}
}
@@ -1209,10 +1217,10 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
try {
return client.checkStatus(getStartUpStatus());
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
logger.warn("Error occurs when check status on node : {}", seedNode);
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
}
return null;
@@ -1391,6 +1399,11 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
}
@Override
+ ClientCategory getClientCategory() {
+ return ClientCategory.META;
+ }
+
+ @Override
public String getMBeanName() {
return mbeanName;
}
@@ -1587,9 +1600,9 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
try {
response = client.checkAlive();
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
nodeStatusHandler.onComplete(response);
}
@@ -1634,7 +1647,14 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
if (client == null) {
return null;
}
- return ClusterUtils.deserializeMigrationStatus(client.collectMigrationStatus());
+ try {
+ return ClusterUtils.deserializeMigrationStatus(client.collectMigrationStatus());
+ } catch (TException e) {
+ client.close();
+ throw e;
+ } finally {
+ client.returnSelf();
+ }
}
@TestOnly
@@ -1837,12 +1857,14 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
Node node = removeNodeLog.getRemovedNode();
if (config.isUseAsyncServer()) {
AsyncMetaClient asyncMetaClient = (AsyncMetaClient) getAsyncClient(node);
- try {
- if (asyncMetaClient != null) {
+ if (asyncMetaClient != null) {
+ try {
asyncMetaClient.exile(removeNodeLog.serialize(), new GenericHandler<>(node, null));
+ } catch (TException e) {
+ logger.warn("Cannot inform {} its removal", node, e);
}
- } catch (TException e) {
- logger.warn("Cannot inform {} its removal", node, e);
+ } else {
+ logger.error("exile node fail for node: {} as empty client", node);
}
} else {
SyncMetaClient client = (SyncMetaClient) getSyncClient(node);
@@ -1852,10 +1874,10 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
try {
client.exile(removeNodeLog.serialize());
} catch (TException e) {
- client.getInputProtocol().getTransport().close();
+ client.close();
logger.warn("Cannot inform {} its removal", node, e);
} finally {
- ClientUtils.putBackSyncClient(client);
+ client.returnSelf();
}
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 76a08b8..73b7b3a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -20,9 +20,11 @@
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.ClientManager;
import org.apache.iotdb.cluster.client.sync.SyncClientAdaptor;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -55,6 +57,7 @@ import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.AppendNodeEntryHandler;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
+import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Peer;
import org.apache.iotdb.cluster.server.monitor.Timer;
import org.apache.iotdb.cluster.server.monitor.Timer.Statistic;
@@ -202,15 +205,10 @@ public abstract class RaftMember implements RaftMemberMBean {
*/
private Map<Node, Long> lastCatchUpResponseTime = new ConcurrentHashMap<>();
/**
- * the pool that provides reusable Thrift clients to connect to other RaftMembers and execute RPC
- * requests. It will be initialized according to the implementation of the subclasses
+ * client manager that provides reusable Thrift clients to connect to other RaftMembers and
+ * execute RPC requests. It will be initialized according to the implementation of the subclasses
*/
- private AsyncClientPool asyncClientPool;
-
- private AsyncClientPool asyncSendLogClientPool;
- private SyncClientPool syncClientPool;
- private AsyncClientPool asyncHeartbeatClientPool;
- private SyncClientPool syncHeartbeatClientPool;
+ private ClientManager clientManager;
/**
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
* if this node is up-to-date with the leader, and whether the given consistency is reached
@@ -246,33 +244,9 @@ public abstract class RaftMember implements RaftMemberMBean {
protected RaftMember() {}
- protected RaftMember(
- String name,
- AsyncClientPool asyncPool,
- SyncClientPool syncPool,
- AsyncClientPool asyncHeartbeatPool,
- SyncClientPool syncHeartbeatPool) {
- this.name = name;
- this.asyncClientPool = asyncPool;
- this.syncClientPool = syncPool;
- this.asyncHeartbeatClientPool = asyncHeartbeatPool;
- this.syncHeartbeatClientPool = syncHeartbeatPool;
- this.asyncSendLogClientPool = asyncClientPool;
- }
-
- protected RaftMember(
- String name,
- AsyncClientPool asyncPool,
- SyncClientPool syncPool,
- AsyncClientPool asyncHeartbeatPool,
- SyncClientPool syncHeartbeatPool,
- AsyncClientPool asyncSendLogClientPool) {
+ protected RaftMember(String name, ClientManager clientManager) {
this.name = name;
- this.asyncClientPool = asyncPool;
- this.syncClientPool = syncPool;
- this.asyncHeartbeatClientPool = asyncHeartbeatPool;
- this.syncHeartbeatClientPool = syncHeartbeatPool;
- this.asyncSendLogClientPool = asyncSendLogClientPool;
+ this.clientManager = clientManager;
}
/**
@@ -597,24 +571,6 @@ public abstract class RaftMember implements RaftMemberMBean {
return localExecutor;
}
- /**
- * Get an asynchronous heartbeat thrift client to the given node.
- *
- * @return an asynchronous thrift client or null if the caller tries to connect the local node.
- */
- public AsyncClient getAsyncHeartbeatClient(Node node) {
- return getAsyncClient(node, asyncHeartbeatClientPool, false);
- }
-
- /**
- * NOTICE: client.putBack() must be called after use.
- *
- * @return the heartbeat client for the node
- */
- public Client getSyncHeartbeatClient(Node node) {
- return getSyncClient(syncHeartbeatClientPool, node, false);
- }
-
public void sendLogAsync(
Log log,
AtomicInteger voteCounter,
@@ -636,14 +592,6 @@ public abstract class RaftMember implements RaftMemberMBean {
}
}
- private Client getSyncClient(SyncClientPool pool, Node node, boolean activatedOnly) {
- if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
- return null;
- }
-
- return pool.getClient(node, activatedOnly);
- }
-
public NodeCharacter getCharacter() {
return character;
}
@@ -774,6 +722,8 @@ public abstract class RaftMember implements RaftMemberMBean {
*/
abstract TSStatus executeNonQueryPlan(PhysicalPlan plan);
+ abstract ClientCategory getClientCategory();
+
/**
* according to the consistency configuration, decide whether to execute syncLeader or not and
* throws exception when failed. Note that the write request will always try to sync leader
@@ -1426,6 +1376,7 @@ public abstract class RaftMember implements RaftMemberMBean {
logger.error(MSG_FORWARD_ERROR, name, plan, receiver, e);
return StatusUtils.getStatus(StatusUtils.INTERNAL_ERROR, e.getMessage());
} catch (TException e) {
+ client.getInputProtocol().getTransport().close();
TSStatus status;
if (e.getCause() instanceof SocketTimeoutException) {
status = StatusUtils.TIME_OUT;
@@ -1449,42 +1400,94 @@ public abstract class RaftMember implements RaftMemberMBean {
* the node cannot be reached.
*/
public AsyncClient getAsyncClient(Node node) {
- return getAsyncClient(node, asyncClientPool, true);
+ try {
+ return clientManager.borrowAsyncClient(node, getClientCategory());
+ } catch (Exception e) {
+ logger.error("borrow async client fail", e);
+ return null;
+ }
}
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node, asyncClientPool, activatedOnly);
+ public AsyncClient getSendLogAsyncClient(Node node) {
+ try {
+ return clientManager.borrowAsyncClient(node, ClientCategory.DATA_ASYNC_APPEND_CLIENT);
+ } catch (Exception e) {
+ logger.error("borrow send log async client fail", e);
+ return null;
+ }
}
- public AsyncClient getSendLogAsyncClient(Node node) {
- return getAsyncClient(node, asyncSendLogClientPool, true);
+ /**
+ * NOTICE: ClientManager.returnClient() must be called after use. the caller needs to check to see
+ * if the return value is null
+ *
+ * @param node the node to connect
+ * @return the client if node is available, otherwise null
+ */
+ public Client getSyncClient(Node node) {
+ try {
+ return clientManager.borrowSyncClient(node, getClientCategory());
+ } catch (Exception e) {
+ logger.error("borrow sync client fail", e);
+ return null;
+ }
}
- private AsyncClient getAsyncClient(Node node, AsyncClientPool pool, boolean activatedOnly) {
+ public Client getSyncClient(Node node, boolean activatedOnly) {
if (ClusterConstant.EMPTY_NODE.equals(node) || node == null) {
return null;
}
+
+ if (activatedOnly && !NodeStatusManager.getINSTANCE().isActivated(node)) {
+ return null;
+ }
+
+ return getSyncClient(node);
+ }
+
+ /**
+ * Get an asynchronous heartbeat thrift client to the given node.
+ *
+ * @return an asynchronous thrift client or null if the caller tries to connect the local node.
+ */
+ public AsyncClient getAsyncHeartbeatClient(Node node) {
+ ClientCategory category =
+ ClientCategory.META == getClientCategory()
+ ? ClientCategory.META_HEARTBEAT
+ : ClientCategory.DATA_HEARTBEAT;
+
try {
- return pool.getClient(node, activatedOnly);
- } catch (IOException e) {
- logger.warn("{} cannot connect to node {}", name, node, e);
+ return clientManager.borrowAsyncClient(node, category);
+ } catch (Exception e) {
+ logger.error("borrow async heartbeat client fail", e);
return null;
}
}
/**
- * NOTICE: client.putBack() must be called after use. the caller needs to check to see if the
- * return value is null
+ * NOTICE: client.putBack() must be called after use.
*
- * @param node the node to connect
- * @return the client if node is available, otherwise null
+ * @return the heartbeat client for the node
*/
- public Client getSyncClient(Node node) {
- return getSyncClient(syncClientPool, node, true);
+ public Client getSyncHeartbeatClient(Node node) {
+ ClientCategory category =
+ ClientCategory.META == getClientCategory()
+ ? ClientCategory.META_HEARTBEAT
+ : ClientCategory.DATA_HEARTBEAT;
+ try {
+ return clientManager.borrowSyncClient(node, category);
+ } catch (Exception e) {
+ logger.error("borrow sync heartbeat client fail", e);
+ return null;
+ }
}
- public Client getSyncClient(Node node, boolean activatedOnly) {
- return getSyncClient(syncClientPool, node, activatedOnly);
+ public void returnSyncClient(Client client) {
+ if (ClientCategory.META == getClientCategory()) {
+ ((SyncMetaClient) client).returnSelf();
+ } else {
+ ((SyncDataClient) client).returnSelf();
+ }
}
public AtomicLong getTerm() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
index 255cb4b..af4bd60 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClientUtils.java
@@ -19,16 +19,11 @@
package org.apache.iotdb.cluster.utils;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient;
+import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient;
import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
public class ClientUtils {
@@ -36,35 +31,38 @@ public class ClientUtils {
// util class
}
- public static boolean isHeartbeatClientReady(AsyncClient client) {
- if (client instanceof AsyncDataHeartbeatClient) {
- return ((AsyncDataHeartbeatClient) client).isReady();
- } else {
- return ((AsyncMetaHeartbeatClient) client).isReady();
- }
- }
-
- public static void putBackSyncHeartbeatClient(Client client) {
- if (client instanceof SyncMetaHeartbeatClient) {
- ((SyncMetaHeartbeatClient) client).putBack();
- } else {
- ((SyncDataHeartbeatClient) client).putBack();
- }
- }
-
- public static void putBackSyncClient(Client client) {
- if (client instanceof SyncDataClient) {
- ((SyncDataClient) client).putBack();
- } else if (client instanceof SyncMetaClient) {
- ((SyncMetaClient) client).putBack();
+ public static int getPort(Node node, ClientCategory category) {
+ int port = -1;
+ switch (category) {
+ case DATA:
+ port = node.getDataPort();
+ break;
+ case DATA_HEARTBEAT:
+ port = node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET;
+ break;
+ case META:
+ port = node.getMetaPort();
+ break;
+ case META_HEARTBEAT:
+ port = node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET;
+ break;
+ case DATA_ASYNC_APPEND_CLIENT:
+ // special data client type
+ port = node.getDataPort();
+ break;
+ default:
+ break;
}
+ return port;
}
- public static boolean isClientReady(AsyncClient client) {
- if (client instanceof AsyncDataClient) {
- return ((AsyncDataClient) client).isReady();
+ public static void putBackSyncClient(RaftService.Client client) {
+ if (client instanceof SyncMetaClient) {
+ ((SyncMetaClient) client).returnSelf();
+ } else if (client instanceof SyncDataClient) {
+ ((SyncDataClient) client).returnSelf();
} else {
- return ((AsyncMetaClient) client).isReady();
+ throw new UnsupportedOperationException("the client type is not supported: " + client);
}
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/BaseClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/BaseClientTest.java
new file mode 100644
index 0000000..89fd51a
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/BaseClientTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+
+public class BaseClientTest {
+
+ protected Node defaultNode = constructDefaultNode();
+
+ private ServerSocket metaServer;
+ private Thread metaServerListeningThread;
+
+ private ServerSocket dataServer;
+ private Thread dataServerListeningThread;
+
+ private ServerSocket metaHeartbeatServer;
+ private Thread metaHeartbeatServerListeningThread;
+
+ private ServerSocket dataHeartbeatServer;
+ private Thread dataHeartbeatServerListeningThread;
+
+ public void startMetaServer() throws IOException {
+ metaServer = new ServerSocket(ClientUtils.getPort(defaultNode, ClientCategory.META));
+ metaServerListeningThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ metaServer.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ metaServerListeningThread.start();
+ }
+
+ public void startDataServer() throws IOException {
+ dataServer = new ServerSocket(ClientUtils.getPort(defaultNode, ClientCategory.DATA));
+ dataServerListeningThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ dataServer.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ dataServerListeningThread.start();
+ }
+
+ public void startMetaHeartbeatServer() throws IOException {
+ metaHeartbeatServer =
+ new ServerSocket(ClientUtils.getPort(defaultNode, ClientCategory.META_HEARTBEAT));
+ metaHeartbeatServerListeningThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ metaHeartbeatServer.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ metaHeartbeatServerListeningThread.start();
+ }
+
+ public void startDataHeartbeatServer() throws IOException {
+ dataHeartbeatServer =
+ new ServerSocket(ClientUtils.getPort(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ dataHeartbeatServerListeningThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ dataHeartbeatServer.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ dataHeartbeatServerListeningThread.start();
+ }
+
+ public void stopMetaServer() throws InterruptedException, IOException {
+ if (metaServer != null) {
+ metaServer.close();
+ }
+ if (metaServerListeningThread != null) {
+ metaServerListeningThread.interrupt();
+ metaServerListeningThread.join();
+ }
+ }
+
+ public void stopDataServer() throws IOException, InterruptedException {
+ if (dataServer != null) {
+ dataServer.close();
+ }
+ if (dataServerListeningThread != null) {
+ dataServerListeningThread.interrupt();
+ dataServerListeningThread.join();
+ }
+ }
+
+ public void stopMetaHeartbeatServer() throws IOException, InterruptedException {
+ if (metaHeartbeatServer != null) {
+ metaHeartbeatServer.close();
+ }
+ if (metaHeartbeatServerListeningThread != null) {
+ metaHeartbeatServerListeningThread.interrupt();
+ metaHeartbeatServerListeningThread.join();
+ }
+ }
+
+ public void stopDataHeartbeatServer() throws IOException, InterruptedException {
+ if (dataHeartbeatServer != null) {
+ dataHeartbeatServer.close();
+ }
+ if (dataServerListeningThread != null) {
+ dataServerListeningThread.interrupt();
+ dataServerListeningThread.join();
+ }
+ }
+
+ public Node constructDefaultNode() {
+ Node node = new Node();
+ node.setMetaPort(9003).setInternalIp("localhost").setClientIp("localhost");
+ node.setDataPort(40010);
+ return node;
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
new file mode 100644
index 0000000..9598822
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientManagerTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ClientManagerTest extends BaseClientTest {
+
+ @Before
+ public void setUp() throws IOException {
+ startDataServer();
+ startMetaServer();
+ startDataHeartbeatServer();
+ startMetaHeartbeatServer();
+ }
+
+ @After
+ public void tearDown() throws IOException, InterruptedException {
+ stopDataServer();
+ stopMetaServer();
+ stopDataHeartbeatServer();
+ stopMetaHeartbeatServer();
+ }
+
+ @Test
+ public void syncClientManagersTest() throws Exception {
+ // ---------Sync cluster clients manager test------------
+ ClientManager clusterManager =
+ new ClientManager(false, ClientManager.Type.RequestForwardClient);
+ RaftService.Client syncClusterClient =
+ clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(syncClusterClient);
+ Assert.assertTrue(syncClusterClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) syncClusterClient).getNode(), defaultNode);
+ Assert.assertTrue(syncClusterClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) syncClusterClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync meta(meta heartbeat) clients manager test------------
+ ClientManager metaManager = new ClientManager(false, ClientManager.Type.MetaGroupClient);
+ RaftService.Client metaClient = metaManager.borrowSyncClient(defaultNode, ClientCategory.META);
+ Assert.assertNotNull(metaClient);
+ Assert.assertTrue(metaClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaClient).getNode(), defaultNode);
+ Assert.assertTrue(metaClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaClient).returnSelf();
+
+ RaftService.Client metaHeartClient =
+ metaManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT);
+ Assert.assertNotNull(metaHeartClient);
+ Assert.assertTrue(metaHeartClient instanceof SyncMetaClient);
+ Assert.assertEquals(((SyncMetaClient) metaHeartClient).getNode(), defaultNode);
+ Assert.assertTrue(metaHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncMetaClient) metaHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ // ---------Sync data(data heartbeat) clients manager test------------
+ ClientManager dataManager = new ClientManager(false, ClientManager.Type.DataGroupClient);
+
+ RaftService.Client dataClient = dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA);
+ Assert.assertNotNull(dataClient);
+ Assert.assertTrue(dataClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataClient).getNode(), defaultNode);
+ Assert.assertTrue(dataClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataClient).returnSelf();
+
+ RaftService.Client dataHeartClient =
+ dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT);
+ Assert.assertNotNull(dataHeartClient);
+ Assert.assertTrue(dataHeartClient instanceof SyncDataClient);
+ Assert.assertEquals(((SyncDataClient) dataHeartClient).getNode(), defaultNode);
+ Assert.assertTrue(dataHeartClient.getInputProtocol().getTransport().isOpen());
+ ((SyncDataClient) dataHeartClient).returnSelf();
+
+ // cluster test
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+ }
+
+ @Test
+ public void asyncClientManagersTest() throws Exception {
+ // ---------async cluster clients manager test------------
+ ClientManager clusterManager = new ClientManager(true, ClientManager.Type.RequestForwardClient);
+ RaftService.AsyncClient clusterClient =
+ clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA);
+
+ Assert.assertNotNull(clusterClient);
+ Assert.assertTrue(clusterClient instanceof AsyncDataClient);
+ Assert.assertEquals(((AsyncDataClient) clusterClient).getNode(), defaultNode);
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isValid());
+ Assert.assertTrue(((AsyncDataClient) clusterClient).isReady());
+
+ // cluster test
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(clusterManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ // ---------async meta(meta heartbeat) clients manager test------------
+ ClientManager metaManager = new ClientManager(true, ClientManager.Type.MetaGroupClient);
+ RaftService.AsyncClient metaClient =
+ metaManager.borrowAsyncClient(defaultNode, ClientCategory.META);
+ Assert.assertNotNull(metaClient);
+ Assert.assertTrue(metaClient instanceof AsyncMetaClient);
+ Assert.assertEquals(((AsyncMetaClient) metaClient).getNode(), defaultNode);
+ Assert.assertTrue(((AsyncMetaClient) metaClient).isValid());
+ Assert.assertTrue(((AsyncMetaClient) metaClient).isReady());
+
+ RaftService.AsyncClient metaHeartClient =
+ metaManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT);
+ Assert.assertNotNull(metaHeartClient);
+ Assert.assertTrue(metaHeartClient instanceof AsyncMetaClient);
+ Assert.assertEquals(((AsyncMetaClient) metaHeartClient).getNode(), defaultNode);
+ Assert.assertTrue(((AsyncMetaClient) metaHeartClient).isValid());
+ Assert.assertTrue(((AsyncMetaClient) metaHeartClient).isReady());
+
+ // cluster test
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(metaManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ // ---------async data(data heartbeat) clients manager test------------
+ ClientManager dataManager = new ClientManager(true, ClientManager.Type.DataGroupClient);
+
+ RaftService.AsyncClient dataClient =
+ dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA);
+ Assert.assertNotNull(dataClient);
+ Assert.assertTrue(dataClient instanceof AsyncDataClient);
+ Assert.assertEquals(((AsyncDataClient) dataClient).getNode(), defaultNode);
+ Assert.assertTrue(((AsyncDataClient) dataClient).isValid());
+ Assert.assertTrue(((AsyncDataClient) dataClient).isReady());
+
+ RaftService.AsyncClient dataHeartClient =
+ dataManager.borrowAsyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT);
+ Assert.assertNotNull(dataHeartClient);
+ Assert.assertTrue(dataHeartClient instanceof AsyncDataClient);
+ Assert.assertEquals(((AsyncDataClient) dataHeartClient).getNode(), defaultNode);
+ Assert.assertTrue(((AsyncDataClient) dataHeartClient).isValid());
+ Assert.assertTrue(((AsyncDataClient) dataHeartClient).isReady());
+
+ // cluster test
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(dataManager.borrowAsyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.DATA_HEARTBEAT));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META));
+ Assert.assertNull(dataManager.borrowSyncClient(defaultNode, ClientCategory.META_HEARTBEAT));
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
new file mode 100644
index 0000000..97da96e
--- /dev/null
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/ClientPoolFactoryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.cluster.client;
+
+import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient;
+import org.apache.iotdb.cluster.config.ClusterConfig;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
+import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
+import org.apache.iotdb.cluster.utils.ClientUtils;
+
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+public class ClientPoolFactoryTest {
+ private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
+
+ private long mockMaxWaitTimeoutMs = 10 * 1000L;
+ private int mockMaxClientPerMember = 10;
+
+ private int maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
+ private long waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
+
+ private ClientPoolFactory clientPoolFactory;
+ private MockClientManager mockClientManager;
+
+ @Before
+ public void setUp() {
+ clusterConfig.setMaxClientPerNodePerMember(mockMaxClientPerMember);
+ clusterConfig.setWaitClientTimeoutMS(mockMaxWaitTimeoutMs);
+ clientPoolFactory = new ClientPoolFactory();
+ mockClientManager =
+ new MockClientManager() {
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {
+ assert (client == asyncClient);
+ }
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {
+ Assert.assertTrue(client == syncClient);
+ }
+ };
+ clientPoolFactory.setClientManager(mockClientManager);
+ }
+
+ @After
+ public void tearDown() {
+ clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
+ clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
+ }
+
+ @Test
+ public void poolConfigTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+ Node node = constructDefaultNode();
+
+ for (int i = 0; i < mockMaxClientPerMember; i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ }
+
+ long timeStart = System.currentTimeMillis();
+ try {
+ pool.borrowObject(node);
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof NoSuchElementException);
+ } finally {
+ Assert.assertTrue(System.currentTimeMillis() - timeStart + 10 > mockMaxWaitTimeoutMs);
+ }
+ }
+
+ @Test
+ public void poolRecycleTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+ Node node = constructDefaultNode();
+ List<RaftService.AsyncClient> clientList = new ArrayList<>();
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ clientList.add(client);
+ }
+
+ for (RaftService.AsyncClient client : clientList) {
+ pool.returnObject(node, client);
+ }
+
+ for (int i = 0; i < pool.getMaxIdlePerKey(); i++) {
+ RaftService.AsyncClient client = pool.borrowObject(node);
+ Assert.assertNotNull(client);
+ Assert.assertTrue(clientList.contains(client));
+ }
+ }
+
+ @Test
+ public void createAsyncDataClientTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.DATA);
+
+ Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+ Assert.assertEquals(pool.getMaxWaitDuration().getNano(), mockMaxWaitTimeoutMs * 1000000);
+
+ RaftService.AsyncClient asyncClient = null;
+
+ Node node = constructDefaultNode();
+
+ asyncClient = pool.borrowObject(node);
+ mockClientManager.setAsyncClient(asyncClient);
+ Assert.assertNotNull(asyncClient);
+ Assert.assertTrue(asyncClient instanceof AsyncDataClient);
+ }
+
+ @Test
+ public void createAsyncMetaClientTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.AsyncClient> pool =
+ clientPoolFactory.createAsyncDataPool(ClientCategory.META);
+
+ Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+ Assert.assertEquals(pool.getMaxWaitDuration().getNano(), mockMaxWaitTimeoutMs * 1000000);
+
+ Node node = constructDefaultNode();
+
+ RaftService.AsyncClient asyncClient = null;
+ asyncClient = pool.borrowObject(node);
+ mockClientManager.setAsyncClient(asyncClient);
+ Assert.assertNotNull(asyncClient);
+ Assert.assertTrue(asyncClient instanceof AsyncMetaClient);
+ }
+
+ @Test
+ public void createSyncDataClientTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.Client> pool =
+ clientPoolFactory.createSyncDataPool(ClientCategory.DATA_HEARTBEAT);
+
+ Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+ Assert.assertEquals(pool.getMaxWaitDuration().getNano(), mockMaxWaitTimeoutMs * 1000000);
+
+ Node node = constructDefaultNode();
+
+ RaftService.Client client = null;
+ ServerSocket serverSocket =
+ new ServerSocket(ClientUtils.getPort(node, ClientCategory.DATA_HEARTBEAT));
+ Thread listenThread = null;
+ try {
+ listenThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ serverSocket.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ listenThread.start();
+
+ client = pool.borrowObject(node);
+ mockClientManager.setSyncClient(client);
+ Assert.assertNotNull(client);
+ Assert.assertTrue(client instanceof SyncDataClient);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ ((SyncDataClient) client).returnSelf();
+ if (serverSocket != null) {
+ serverSocket.close();
+ listenThread.interrupt();
+ listenThread.join();
+ }
+ }
+ }
+
+ @Test
+ public void createSyncMetaClientTest() throws Exception {
+ GenericKeyedObjectPool<Node, RaftService.Client> pool =
+ clientPoolFactory.createSyncMetaPool(ClientCategory.META_HEARTBEAT);
+
+ Assert.assertEquals(pool.getMaxTotalPerKey(), mockMaxClientPerMember);
+ Assert.assertEquals(pool.getMaxWaitDuration().getNano(), mockMaxWaitTimeoutMs * 1000000);
+
+ Node node = constructDefaultNode();
+
+ RaftService.Client client = null;
+ ServerSocket serverSocket =
+ new ServerSocket(ClientUtils.getPort(node, ClientCategory.META_HEARTBEAT));
+ Thread listenThread = null;
+ try {
+ listenThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ serverSocket.accept();
+ } catch (IOException e) {
+ return;
+ }
+ }
+ });
+ listenThread.start();
+
+ client = pool.borrowObject(node);
+ mockClientManager.setSyncClient(client);
+ Assert.assertNotNull(client);
+ Assert.assertTrue(client instanceof SyncMetaClient);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ ((SyncMetaClient) client).returnSelf();
+ if (serverSocket != null) {
+ serverSocket.close();
+ listenThread.interrupt();
+ listenThread.join();
+ }
+ }
+ }
+
+ private Node constructDefaultNode() {
+ Node node = new Node();
+ node.setMetaPort(9003).setInternalIp("localhost").setClientIp("localhost");
+ node.setDataPort(40010);
+ return node;
+ }
+}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
deleted file mode 100644
index d2ee0b7..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/DataClientProviderTest.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client;
-
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.utils.ClientUtils;
-import org.apache.iotdb.cluster.utils.ClusterNode;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import static org.junit.Assert.assertNotNull;
-
-public class DataClientProviderTest {
- private ClusterConfig clusterConfig = ClusterDescriptor.getInstance().getConfig();
- private int maxClientPerNodePerMember;
- private long waitClientTimeoutMS;
-
- @Before
- public void setUp() {
- maxClientPerNodePerMember = clusterConfig.getMaxClientPerNodePerMember();
- waitClientTimeoutMS = clusterConfig.getWaitClientTimeoutMS();
- clusterConfig.setMaxClientPerNodePerMember(2);
- clusterConfig.setWaitClientTimeoutMS(10L);
- }
-
- @After
- public void tearDown() {
- clusterConfig.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
- clusterConfig.setWaitClientTimeoutMS(waitClientTimeoutMS);
- }
-
- @Test
- public void testAsync() throws IOException {
- boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
- DataClientProvider provider = new DataClientProvider(new Factory());
-
- assertNotNull(provider.getAsyncDataClient(TestUtils.getNode(0), 100));
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
- }
-
- @Test
- public void testSync() throws IOException, InterruptedException {
- Node node = new Node();
- node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(false);
- DataClientProvider provider = new DataClientProvider(new Factory());
- SyncDataClient client = null;
- try {
- client = provider.getSyncDataClient(node, 100);
- } catch (TException e) {
- Assert.fail(e.getMessage());
- } finally {
- ClientUtils.putBackSyncClient(client);
- }
- assertNotNull(client);
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
- } finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
- }
- }
-
- @Test
- public void testSyncConcurrency() throws IOException, InterruptedException {
- Node node = new ClusterNode();
- node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(false);
- DataClientProvider provider = new DataClientProvider(new Factory());
- SyncDataClient client = null;
- try {
- client = provider.getSyncDataClient(node, 100);
- } catch (TException e) {
- Assert.fail(e.getMessage());
- }
- assertNotNull(client);
-
- // now try to test multi thread
- ExecutorService service = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 5; i++) {
- service.submit(() -> provider.getSyncDataClient(node, 100));
- }
-
- // wait time should be great then 10 * 5ms
- Thread.currentThread().sleep(1000);
- int totalNumber = provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
- Assert.assertEquals(6, totalNumber);
-
- for (int i = 0; i < 4; i++) {
- service.submit(() -> provider.getSyncDataClient(node, 100));
- }
-
- Thread.currentThread().sleep(100);
- // return one client to pool
- provider.getDataSyncClientPool().putClient(node, client);
- // wait all finish
- Thread.currentThread().sleep(1000);
- totalNumber = provider.getDataSyncClientPool().getNodeClientNumMap().get(node);
-
- // 6 + 4
- Assert.assertEquals(10, totalNumber);
-
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
- } catch (Exception e) {
- Assert.fail(e.getMessage());
- } finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
- }
- }
-
- @Test
- public void testAsyncConcurrency() throws IOException, InterruptedException {
- Node node = new ClusterNode();
- node.setDataPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- boolean useAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
- DataClientProvider provider = new DataClientProvider(new Factory());
- AsyncDataClient client = null;
- try {
- client = provider.getAsyncDataClient(node, 100);
- } catch (IOException e) {
- Assert.fail(e.getMessage());
- }
- assertNotNull(client);
-
- // now try to test multi thread
- ExecutorService service = Executors.newFixedThreadPool(10);
- for (int i = 0; i < 5; i++) {
- service.submit(() -> provider.getAsyncDataClient(node, 100));
- }
-
- // wait time should be great then 10ms
- Thread.currentThread().sleep(1000);
- int totalNumber = provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
- Assert.assertEquals(6, totalNumber);
-
- for (int i = 0; i < 4; i++) {
- service.submit(() -> provider.getAsyncDataClient(node, 100));
- }
-
- Thread.currentThread().sleep(100);
- // return one client to pool, but number do not -1
- provider.getDataAsyncClientPool().putClient(node, client);
- // wait all finish
- Thread.currentThread().sleep(1000);
- totalNumber = provider.getDataAsyncClientPool().getNodeClientNumMap().get(node);
-
- // 6 + 4
- Assert.assertEquals(10, totalNumber);
-
- ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(useAsyncServer);
- } catch (Exception e) {
- Assert.fail(e.getMessage());
- } finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
- }
- }
-}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/MockClientManager.java
similarity index 60%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
rename to cluster/src/test/java/org/apache/iotdb/cluster/client/MockClientManager.java
index fa0da62..c3153a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncClientFactory.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/MockClientManager.java
@@ -17,28 +17,31 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.sync;
+package org.apache.iotdb.cluster.client;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
-import org.apache.thrift.transport.TTransportException;
+public abstract class MockClientManager implements IClientManager {
-import java.io.IOException;
+ RaftService.AsyncClient asyncClient;
+ RaftService.Client syncClient;
-public interface SyncClientFactory {
+ public void setAsyncClient(RaftService.AsyncClient asyncClient) {
+ this.asyncClient = asyncClient;
+ }
+
+ public void setSyncClient(RaftService.Client client) {
+ this.syncClient = client;
+ }
- /**
- * Get a client which will connect the given node and be cached in the given pool.
- *
- * @param node the cluster node the client will connect.
- * @param pool the pool that will cache the client for reusing.
- * @return
- * @throws IOException
- */
- RaftService.Client getSyncClient(Node node, SyncClientPool pool) throws TTransportException;
+ @Override
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category) {
+ return null;
+ }
- default String nodeInfo(Node node) {
- return node.toString();
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
deleted file mode 100644
index a08d9fe..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless [...]
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.Factory;
-import org.apache.iotdb.cluster.common.TestAsyncClient;
-import org.apache.iotdb.cluster.common.TestAsyncClientFactory;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class AsyncClientPoolTest {
-
- @Mock private AsyncClientFactory testAsyncClientFactory;
-
- private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- private boolean isAsyncServer;
-
- @Before
- public void setUp() {
- isAsyncServer = config.isUseAsyncServer();
- config.setUseAsyncServer(true);
- }
-
- @After
- public void tearDown() {
- config.setUseAsyncServer(isAsyncServer);
- }
-
- @Test
- public void testTestClient() throws IOException {
- testAsyncClientFactory = new TestAsyncClientFactory();
- getClient();
- putClient();
- }
-
- @Test
- public void testDataClient() throws IOException {
- testAsyncClientFactory = new Factory(new TBinaryProtocol.Factory());
- getClient();
- putClient();
- }
-
- @Test
- public void testMetaClient() throws IOException {
- testAsyncClientFactory = new AsyncMetaClient.Factory(new TBinaryProtocol.Factory());
- getClient();
- putClient();
- }
-
- private void getClient() throws IOException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
- for (int i = 0; i < 10; i++) {
- AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
- if (client instanceof TestAsyncClient) {
- TestAsyncClient testAsyncClient = (TestAsyncClient) client;
- assertEquals(i, testAsyncClient.getSerialNum());
- }
- }
- }
-
- private void putClient() throws IOException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
- List<AsyncClient> testClients = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
- testClients.add(client);
- }
- if (testAsyncClientFactory instanceof TestAsyncClientFactory) {
- for (int i = 0; i < 10; i++) {
- asyncClientPool.putClient(TestUtils.getNode(i), testClients.get(i));
- }
- } else if (testAsyncClientFactory instanceof AsyncMetaClient.Factory) {
- for (AsyncClient testClient : testClients) {
- ((AsyncMetaClient) testClient).onComplete();
- }
- } else if (testAsyncClientFactory instanceof Factory) {
- for (AsyncClient testClient : testClients) {
- ((AsyncDataClient) testClient).onComplete();
- }
- }
-
- for (int i = 0; i < 10; i++) {
- AsyncClient poolClient = asyncClientPool.getClient(TestUtils.getNode(i));
- assertEquals(testClients.get(i), poolClient);
- }
- }
-
- @Test
- public void testMaxClient() throws IOException {
- int maxClientNum = config.getMaxClientPerNodePerMember();
- config.setMaxClientPerNodePerMember(5);
- testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
-
- for (int i = 0; i < 5; i++) {
- asyncClientPool.getClient(TestUtils.getNode(0));
- }
- AtomicReference<AsyncClient> reference = new AtomicReference<>();
- Thread t =
- new Thread(
- () -> {
- try {
- reference.set(asyncClientPool.getClient(TestUtils.getNode(0)));
- } catch (IOException e) {
- e.printStackTrace();
- }
- });
- t.start();
- t.interrupt();
- assertNull(reference.get());
- config.setMaxClientPerNodePerMember(maxClientNum);
- }
-
- @Test
- public void testWaitClient() throws IOException {
- int maxClientPerNodePerMember = config.getMaxClientPerNodePerMember();
- try {
- config.setMaxClientPerNodePerMember(10);
- testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
-
- Node node = TestUtils.getNode(0);
- List<AsyncClient> clients = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- clients.add(asyncClientPool.getClient(node));
- }
-
- AtomicBoolean waitStart = new AtomicBoolean(false);
- new Thread(
- () -> {
- while (!waitStart.get()) {
- // wait until we start to for wait for a client
- }
- synchronized (asyncClientPool) {
- for (AsyncClient client : clients) {
- asyncClientPool.putClient(node, client);
- }
- }
- })
- .start();
-
- AsyncClient client;
- synchronized (asyncClientPool) {
- waitStart.set(true);
- // getClient() will wait on asyncClientPool, so the thread above can return clients
- client = asyncClientPool.getClient(node);
- }
- assertNotNull(client);
- } finally {
- config.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
- }
- }
-
- @Test
- public void testWaitClientTimeOut() throws IOException {
- int maxClientPerNodePerMember = config.getMaxClientPerNodePerMember();
- try {
- config.setMaxClientPerNodePerMember(1);
- testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
-
- Node node = TestUtils.getNode(0);
- List<AsyncClient> clients = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- clients.add(asyncClientPool.getClient(node));
- }
-
- assertNotEquals(clients.get(0), clients.get(1));
- } finally {
- config.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
- }
- }
-
- @Test
- public void testRecreateClient() throws IOException {
- testAsyncClientFactory = new TestAsyncClientFactory();
- AsyncClientPool asyncClientPool =
- new AsyncClientPool(new AsyncMetaClient.Factory(new TBinaryProtocol.Factory()));
-
- AsyncMetaClient client = (AsyncMetaClient) asyncClientPool.getClient(TestUtils.getNode(0));
- client.onError(new Exception());
-
- AsyncClient newClient = asyncClientPool.getClient(TestUtils.getNode(0));
- assertNotEquals(client, newClient);
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
index 0c2a666..5c95e18 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataClientTest.java
@@ -4,84 +4,76 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.SingleManagerFactory;
-import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.junit.After;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-public class AsyncDataClientTest {
+public class AsyncDataClientTest extends BaseClientTest {
private final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- private boolean isAsyncServer;
+ private TProtocolFactory protocolFactory;
@Before
public void setUp() {
- isAsyncServer = config.isUseAsyncServer();
config.setUseAsyncServer(true);
+ protocolFactory =
+ config.isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
}
- @After
- public void tearDown() {
- config.setUseAsyncServer(isAsyncServer);
+ @Test
+ public void testDataClient() throws Exception {
+
+ AsyncDataClient.AsyncDataClientFactory factory =
+ new AsyncDataClient.AsyncDataClientFactory(protocolFactory, ClientCategory.DATA);
+
+ AsyncDataClient dataClient = factory.makeObject(defaultNode).getObject();
+
+ assertEquals(
+ "AsyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+ dataClient.toString());
+ assertCheck(dataClient);
}
@Test
- public void test() throws IOException, TException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(new SingleManagerFactory(new Factory()));
- AsyncDataClient client;
- Node node = TestUtils.getNode(0);
- client =
- new AsyncDataClient(
- new Factory(),
- new TAsyncClientManager(),
- new TNonblockingSocket(
- node.getInternalIp(),
- node.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS()));
- assertTrue(client.isReady());
-
- client = (AsyncDataClient) asyncClientPool.getClient(TestUtils.getNode(0));
-
- assertEquals(TestUtils.getNode(0), client.getNode());
-
- client.matchTerm(
- 0,
- 0,
- TestUtils.getRaftNode(0, 0),
- new AsyncMethodCallback<Boolean>() {
- @Override
- public void onComplete(Boolean aBoolean) {
- // do nothing
- }
-
- @Override
- public void onError(Exception e) {
- // do nothing
- }
- });
- assertFalse(client.isReady());
-
- client.onError(new Exception());
- assertNull(client.getCurrMethod());
- assertFalse(client.isReady());
+ public void testMetaHeartbeatClient() throws Exception {
+
+ AsyncDataClient.AsyncDataClientFactory factory =
+ new AsyncDataClient.AsyncDataClientFactory(protocolFactory, ClientCategory.DATA_HEARTBEAT);
+
+ AsyncDataClient dataClient = factory.makeObject(defaultNode).getObject();
assertEquals(
- "DataClient{node=ClusterNode{ internalIp='192.168.0.0', metaPort=9003, nodeIdentifier=0, dataPort=40010, clientPort=6667, clientIp='0.0.0.0'}}",
- client.toString());
+ "AsyncDataHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40011}",
+ dataClient.toString());
+ assertCheck(dataClient);
+ }
+
+ private void assertCheck(AsyncDataClient dataClient) {
+ Assert.assertNotNull(dataClient);
+ assertTrue(dataClient.isReady());
+ assertTrue(dataClient.isValid());
+ Assert.assertEquals(dataClient.getNode(), defaultNode);
+
+ dataClient.setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ Assert.assertEquals(dataClient.getTimeout(), ClusterConstant.getConnectionTimeoutInMS());
+
+ dataClient.close();
+ Assert.assertNull(dataClient.getCurrMethod());
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
deleted file mode 100644
index c5303c8..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
-import junit.framework.TestCase;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class AsyncDataHeartbeatClientTest extends TestCase {
-
- private final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- private boolean isAsyncServer;
-
- @Before
- public void setUp() {
- isAsyncServer = config.isUseAsyncServer();
- config.setUseAsyncServer(true);
- }
-
- @After
- public void tearDown() {
- config.setUseAsyncServer(isAsyncServer);
- }
-
- @Test
- public void test() throws IOException {
- AsyncDataHeartbeatClient.Factory factoryAsync =
- new AsyncDataHeartbeatClient.Factory(new Factory());
- AsyncClient asyncClient = factoryAsync.getAsyncClient(TestUtils.getNode(0), null);
- assertEquals(
- "AsyncDataHeartbeatClient{node=Node(internalIp:192.168.0.0, metaPort:9003, nodeIdentifier:0, dataPort:40010, clientPort:6667, clientIp:0.0.0.0),dataHeartbeatPort=40011}",
- asyncClient.toString());
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index 53cc1a0..af58869 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -4,87 +4,75 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient.Factory;
-import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.thrift.TException;
-import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.transport.TNonblockingSocket;
-import org.junit.After;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.IOException;
-
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-public class AsyncMetaClientTest {
+public class AsyncMetaClientTest extends BaseClientTest {
private final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- private boolean isAsyncServer;
+ private TProtocolFactory protocolFactory;
@Before
public void setUp() {
- isAsyncServer = config.isUseAsyncServer();
config.setUseAsyncServer(true);
+ protocolFactory =
+ config.isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
}
- @After
- public void tearDown() {
- config.setUseAsyncServer(isAsyncServer);
+ @Test
+ public void testMetaClient() throws Exception {
+
+ AsyncMetaClient.AsyncMetaClientFactory factory =
+ new AsyncMetaClient.AsyncMetaClientFactory(protocolFactory, ClientCategory.META);
+
+ AsyncMetaClient metaClient = factory.makeObject(defaultNode).getObject();
+
+ assertEquals(
+ "AsyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+ metaClient.toString());
+ assertCheck(metaClient);
}
@Test
- public void test() throws IOException, TException {
- AsyncClientPool asyncClientPool =
- new AsyncClientPool(new Factory(new org.apache.thrift.protocol.TBinaryProtocol.Factory()));
- AsyncMetaClient client;
- Node node = TestUtils.getNode(0);
- client =
- new AsyncMetaClient(
- new org.apache.thrift.protocol.TBinaryProtocol.Factory(),
- new TAsyncClientManager(),
- new TNonblockingSocket(
- node.getInternalIp(),
- node.getMetaPort(),
- ClusterConstant.getConnectionTimeoutInMS()));
- assertTrue(client.isReady());
-
- client = (AsyncMetaClient) asyncClientPool.getClient(TestUtils.getNode(0));
-
- assertEquals(TestUtils.getNode(0), client.getNode());
-
- client.matchTerm(
- 0,
- 0,
- TestUtils.getRaftNode(0, 0),
- new AsyncMethodCallback<Boolean>() {
- @Override
- public void onComplete(Boolean aBoolean) {
- // do nothing
- }
-
- @Override
- public void onError(Exception e) {
- // do nothing
- }
- });
- assertFalse(client.isReady());
-
- client.onError(new Exception());
- assertNull(client.getCurrMethod());
- assertTrue(client.isReady());
+ public void testMetaHeartbeatClient() throws Exception {
+ AsyncMetaClient.AsyncMetaClientFactory factory =
+ new AsyncMetaClient.AsyncMetaClientFactory(protocolFactory, ClientCategory.META_HEARTBEAT);
+
+ AsyncMetaClient metaClient = factory.makeObject(defaultNode).getObject();
assertEquals(
- "MetaClient{node=ClusterNode{ internalIp='192.168.0.0', metaPort=9003, nodeIdentifier=0, dataPort=40010, clientPort=6667, clientIp='0.0.0.0'}}",
- client.toString());
+ "AsyncMetaHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9004}",
+ metaClient.toString());
+ assertCheck(metaClient);
+ }
+
+ private void assertCheck(AsyncMetaClient dataClient) {
+ Assert.assertNotNull(dataClient);
+ assertTrue(dataClient.isReady());
+ assertTrue(dataClient.isValid());
+ Assert.assertEquals(dataClient.getNode(), defaultNode);
+
+ dataClient.setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ Assert.assertEquals(dataClient.getTimeout(), ClusterConstant.getConnectionTimeoutInMS());
+
+ dataClient.close();
+ Assert.assertNull(dataClient.getCurrMethod());
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
deleted file mode 100644
index 4cee53a..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.async;
-
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterConfig;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-public class AsyncMetaHeartbeatClientTest {
-
- private final ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
- private boolean isAsyncServer;
-
- @Before
- public void setUp() {
- isAsyncServer = config.isUseAsyncServer();
- config.setUseAsyncServer(true);
- }
-
- @After
- public void tearDown() {
- config.setUseAsyncServer(isAsyncServer);
- }
-
- @Test
- public void test() throws IOException {
- AsyncMetaHeartbeatClient.Factory factoryAsync =
- new AsyncMetaHeartbeatClient.Factory(new TBinaryProtocol.Factory());
- AsyncClient asyncClient = factoryAsync.getAsyncClient(TestUtils.getNode(0), null);
- Assert.assertEquals(
- "AsyncMetaHeartbeatClient{node=Node(internalIp:192.168.0.0, metaPort:9003, "
- + "nodeIdentifier:0, dataPort:40010, clientPort:6667, clientIp:0.0.0.0),metaHeartbeatPort=9004}",
- asyncClient.toString());
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
deleted file mode 100644
index d14a31b..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientPoolTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless [...]
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.common.TestSyncClient;
-import org.apache.iotdb.cluster.common.TestSyncClientFactory;
-import org.apache.iotdb.cluster.common.TestUtils;
-import org.apache.iotdb.cluster.config.ClusterDescriptor;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mock;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-
-public class SyncClientPoolTest {
-
- @Mock private SyncClientFactory testSyncClientFactory;
-
- @Before
- public void setUp() {
- testSyncClientFactory = new TestSyncClientFactory();
- }
-
- @After
- public void tearDown() {
- testSyncClientFactory = null;
- }
-
- @Test
- public void testTestClient() {
- getClient();
- putClient();
- }
-
- private void getClient() {
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
- for (int i = 0; i < 10; i++) {
- Client client = syncClientPool.getClient(TestUtils.getNode(i));
- if (client instanceof TestSyncClient) {
- TestSyncClient testSyncClient = (TestSyncClient) client;
- assertEquals(i, testSyncClient.getSerialNum());
- }
- }
- }
-
- private void putClient() {
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
- List<Client> testClients = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- Client client = syncClientPool.getClient(TestUtils.getNode(i));
- testClients.add(client);
- }
- for (int i = 0; i < 10; i++) {
- syncClientPool.putClient(TestUtils.getNode(i), testClients.get(i));
- }
-
- for (int i = 0; i < 10; i++) {
- Client poolClient = syncClientPool.getClient(TestUtils.getNode(i));
- assertEquals(testClients.get(i), poolClient);
- }
- }
-
- @Test
- public void testPutBadClient() {
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
- Client client = syncClientPool.getClient(TestUtils.getNode(0));
- client.getInputProtocol().getTransport().close();
- syncClientPool.putClient(TestUtils.getNode(0), client);
- Client newClient = syncClientPool.getClient(TestUtils.getNode(0));
- assertNotEquals(client, newClient);
- }
-
- @Test
- public void testMaxClient() {
- int maxClientNum = ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(5);
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
-
- for (int i = 0; i < 5; i++) {
- syncClientPool.getClient(TestUtils.getNode(0));
- }
- AtomicReference<Client> reference = new AtomicReference<>();
- Thread t = new Thread(() -> reference.set(syncClientPool.getClient(TestUtils.getNode(0))));
- t.start();
- t.interrupt();
- assertNull(reference.get());
- ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(maxClientNum);
- }
-
- @Test
- public void testWaitClient() {
- int maxClientPerNodePerMember =
- ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- try {
- ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(10);
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
-
- Node node = TestUtils.getNode(0);
- List<Client> clients = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- clients.add(syncClientPool.getClient(node));
- }
-
- AtomicBoolean waitStart = new AtomicBoolean(false);
- new Thread(
- () -> {
- while (!waitStart.get()) {
- // wait until we start to for wait for a client
- }
- synchronized (syncClientPool) {
- for (Client client : clients) {
- syncClientPool.putClient(node, client);
- }
- }
- })
- .start();
-
- Client client;
- synchronized (syncClientPool) {
- waitStart.set(true);
- // getClient() will wait on syncClientPool, so the thread above can return clients
- client = syncClientPool.getClient(node);
- }
- assertNotNull(client);
- } finally {
- ClusterDescriptor.getInstance()
- .getConfig()
- .setMaxClientPerNodePerMember(maxClientPerNodePerMember);
- }
- }
-
- @Test
- public void testWaitClientTimeOut() {
- int maxClientPerNodePerMember =
- ClusterDescriptor.getInstance().getConfig().getMaxClientPerNodePerMember();
- try {
- ClusterDescriptor.getInstance().getConfig().setMaxClientPerNodePerMember(1);
- SyncClientPool syncClientPool = new SyncClientPool(testSyncClientFactory);
-
- Node node = TestUtils.getNode(0);
- List<Client> clients = new ArrayList<>();
- for (int i = 0; i < 2; i++) {
- clients.add(syncClientPool.getClient(node));
- }
-
- assertNotEquals(clients.get(0), clients.get(1));
- } finally {
- ClusterDescriptor.getInstance()
- .getConfig()
- .setMaxClientPerNodePerMember(maxClientPerNodePerMember);
- }
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
index 14566f9..1c76927 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
@@ -4,118 +4,107 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient.Factory;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.rpc.TSocketWrapper;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.SocketException;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-public class SyncDataClientTest {
+public class SyncDataClientTest extends BaseClientTest {
- @Test
- public void test() throws IOException, InterruptedException {
- Node node = new Node();
- node.setDataPort(40010).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- SyncClientPool syncClientPool =
- new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
- SyncDataClient client;
- client = (SyncDataClient) syncClientPool.getClient(node);
-
- assertEquals(node, client.getTarget());
+ private TProtocolFactory protocolFactory;
- client.setTimeout(1000);
- assertEquals(1000, client.getTimeout());
-
- client.putBack();
- Client newClient = syncClientPool.getClient(node);
- assertEquals(client, newClient);
- assertTrue(client.getInputProtocol().getTransport().isOpen());
-
- assertEquals("SyncDataClient (ip = localhost, port = 40010, id = 0)", client.toString());
+ @Before
+ public void setUp() {
+ protocolFactory =
+ ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
+ }
- client =
- new SyncDataClient(
- new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
- // client without a belong pool will be closed after putBack()
- client.putBack();
- assertFalse(client.getInputProtocol().getTransport().isOpen());
+ @Test
+ public void testDataClient() throws IOException, InterruptedException, TTransportException {
+ try {
+ startDataServer();
+ SyncDataClient dataClient =
+ new SyncDataClient(protocolFactory, defaultNode, ClientCategory.DATA);
+
+ assertEquals(
+ "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+ dataClient.toString());
+
+ assertCheck(dataClient);
+
+ dataClient =
+ new SyncDataClient.SyncDataClientFactory(protocolFactory, ClientCategory.DATA)
+ .makeObject(defaultNode)
+ .getObject();
+
+ assertEquals(
+ "SyncDataClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40010}",
+ dataClient.toString());
+
+ assertCheck(dataClient);
+ } catch (Exception e) {
+ e.printStackTrace();
} finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
+ stopDataServer();
}
}
@Test
- public void testTryClose() throws IOException, InterruptedException {
- Node node = new Node();
- node.setDataPort(40010).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
+ public void testDataHeartbeatClient()
+ throws IOException, InterruptedException, TTransportException {
try {
- SyncClientPool syncClientPool =
- new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
- SyncDataClient clientOut;
- try (SyncDataClient clientIn = (SyncDataClient) syncClientPool.getClient(node)) {
- assertEquals(node, clientIn.getTarget());
- clientIn.setTimeout(1000);
- clientOut = clientIn;
- assertEquals(1000, clientIn.getTimeout());
- }
- assertTrue(clientOut.getInputProtocol().getTransport().isOpen());
-
- try (SyncDataClient newClient = (SyncDataClient) syncClientPool.getClient(node)) {
- assertEquals(clientOut, newClient);
- assertEquals("SyncDataClient (ip = localhost, port = 40010, id = 0)", newClient.toString());
- }
-
- try (SyncDataClient clientIn =
- new SyncDataClient(
- new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())))) {
- clientOut = clientIn;
- }
- // client without a belong pool will be closed after putBack()
- assertFalse(clientOut.getInputProtocol().getTransport().isOpen());
+ startDataHeartbeatServer();
+ SyncDataClient dataHeartbeatClient =
+ new SyncDataClient(protocolFactory, defaultNode, ClientCategory.DATA_HEARTBEAT);
+
+ assertCheck(dataHeartbeatClient);
+ assertEquals(
+ "SyncDataHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40011}",
+ dataHeartbeatClient.toString());
+
+ dataHeartbeatClient =
+ new SyncDataClient.SyncDataClientFactory(protocolFactory, ClientCategory.DATA_HEARTBEAT)
+ .makeObject(defaultNode)
+ .getObject();
+ assertCheck(dataHeartbeatClient);
+ assertEquals(
+ "SyncDataHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=40011}",
+ dataHeartbeatClient.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
} finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
+ stopDataHeartbeatServer();
}
}
+
+ private void assertCheck(SyncDataClient dataClient) throws SocketException {
+ Assert.assertNotNull(dataClient);
+ Assert.assertTrue(dataClient.getInputProtocol().getTransport().isOpen());
+ Assert.assertEquals(dataClient.getNode(), defaultNode);
+
+ dataClient.setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ Assert.assertEquals(dataClient.getTimeout(), ClusterConstant.getConnectionTimeoutInMS());
+
+ dataClient.close();
+ Assert.assertFalse(dataClient.getInputProtocol().getTransport().isOpen());
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
deleted file mode 100644
index 8bccd22..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-public class SyncDataHeartbeatClientTest {
-
- @Test
- public void test() throws IOException, TTransportException, InterruptedException {
- Node node = new Node();
- node.setDataPort(40010).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getDataPort() + 1);
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- SyncDataHeartbeatClient.Factory factoryAsync =
- new SyncDataHeartbeatClient.Factory(new Factory());
- SyncDataHeartbeatClient syncClient = factoryAsync.getSyncClient(node, null);
- Assert.assertEquals(
- "SyncDataHBClient (ip = localhost, port = 40011, id = 0)", syncClient.toString());
- } finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
- }
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
index 2bc64dd..088d8a9 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
@@ -4,112 +4,107 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient.Factory;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-import org.apache.iotdb.rpc.TSocketWrapper;
+import org.apache.iotdb.cluster.client.BaseClientTest;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.config.ClusterConstant;
+import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.transport.TTransportException;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
-import java.net.ServerSocket;
+import java.net.SocketException;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-public class SyncMetaClientTest {
+public class SyncMetaClientTest extends BaseClientTest {
- @Test
- public void test() throws IOException, InterruptedException {
- Node node = new Node();
- node.setMetaPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getMetaPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
+ private TProtocolFactory protocolFactory;
+
+ @Before
+ public void setUp() {
+ protocolFactory =
+ ClusterDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
+ ? new TCompactProtocol.Factory()
+ : new TBinaryProtocol.Factory();
+ }
+ @Test
+ public void testMetaClient() throws IOException, InterruptedException, TTransportException {
try {
- SyncClientPool syncClientPool =
- new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
- SyncMetaClient client;
- client = (SyncMetaClient) syncClientPool.getClient(node);
-
- assertEquals(node, client.getTarget());
-
- client.putBack();
- Client newClient = syncClientPool.getClient(node);
- assertEquals(client, newClient);
- assertTrue(client.getInputProtocol().getTransport().isOpen());
-
- client =
- new SyncMetaClient(
- new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())));
- // client without a belong pool will be closed after putBack()
- client.putBack();
- assertFalse(client.getInputProtocol().getTransport().isOpen());
+ startMetaServer();
+ SyncMetaClient metaClient =
+ new SyncMetaClient(protocolFactory, defaultNode, ClientCategory.META);
+
+ assertEquals(
+ "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+ metaClient.toString());
+
+ assertCheck(metaClient);
+
+ metaClient =
+ new SyncMetaClient.SyncMetaClientFactory(protocolFactory, ClientCategory.META)
+ .makeObject(defaultNode)
+ .getObject();
+
+ assertEquals(
+ "SyncMetaClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9003}",
+ metaClient.toString());
+
+ assertCheck(metaClient);
+ } catch (Exception e) {
+ e.printStackTrace();
} finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
+ stopMetaServer();
}
}
@Test
- public void testTryClose() throws IOException, InterruptedException {
- Node node = new Node();
- node.setMetaPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getMetaPort());
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
+ public void testDataHeartbeatClient()
+ throws IOException, InterruptedException, TTransportException {
try {
- SyncClientPool syncClientPool =
- new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
- SyncMetaClient clientOut;
- try (SyncMetaClient clientIn = (SyncMetaClient) syncClientPool.getClient(node); ) {
- assertEquals(node, clientIn.getTarget());
- clientOut = clientIn;
- }
-
- try (SyncMetaClient newClientIn = (SyncMetaClient) syncClientPool.getClient(node)) {
- assertEquals(node, newClientIn.getTarget());
- assertEquals(clientOut, newClientIn);
- }
- assertTrue(clientOut.getInputProtocol().getTransport().isOpen());
-
- try (SyncMetaClient clientIn =
- new SyncMetaClient(
- new TBinaryProtocol(TSocketWrapper.wrap(node.getInternalIp(), node.getDataPort())))) {
- clientOut = clientIn;
- }
-
- // client without a belong pool will be closed after putBack()
- assertFalse(clientOut.getInputProtocol().getTransport().isOpen());
+ startMetaHeartbeatServer();
+ SyncMetaClient metaHeartbeatClient =
+ new SyncMetaClient(protocolFactory, defaultNode, ClientCategory.META_HEARTBEAT);
+
+ assertCheck(metaHeartbeatClient);
+ assertEquals(
+ "SyncMetaHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9004}",
+ metaHeartbeatClient.toString());
+
+ metaHeartbeatClient =
+ new SyncMetaClient.SyncMetaClientFactory(protocolFactory, ClientCategory.META_HEARTBEAT)
+ .makeObject(defaultNode)
+ .getObject();
+ assertCheck(metaHeartbeatClient);
+ assertEquals(
+ "SyncMetaHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003, nodeIdentifier:0, "
+ + "dataPort:40010, clientPort:0, clientIp:localhost),port=9004}",
+ metaHeartbeatClient.toString());
+ } catch (Exception e) {
+ e.printStackTrace();
} finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
+ stopMetaHeartbeatServer();
}
}
+
+ private void assertCheck(SyncMetaClient metaClient) throws SocketException {
+ Assert.assertNotNull(metaClient);
+ Assert.assertTrue(metaClient.getInputProtocol().getTransport().isOpen());
+ Assert.assertEquals(metaClient.getNode(), defaultNode);
+
+ metaClient.setTimeout(ClusterConstant.getConnectionTimeoutInMS());
+ Assert.assertEquals(metaClient.getTimeout(), ClusterConstant.getConnectionTimeoutInMS());
+
+ metaClient.close();
+ Assert.assertFalse(metaClient.getInputProtocol().getTransport().isOpen());
+ }
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
deleted file mode 100644
index c01d6a9..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.client.sync;
-
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.transport.TTransportException;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-
-public class SyncMetaHeartbeatClientTest {
-
- @Test
- public void test() throws IOException, TTransportException, InterruptedException {
- Node node = new Node();
- node.setMetaPort(9003).setInternalIp("localhost").setClientIp("localhost");
- ServerSocket serverSocket = new ServerSocket(node.getMetaPort() + 1);
- Thread listenThread =
- new Thread(
- () -> {
- while (!Thread.interrupted()) {
- try {
- serverSocket.accept();
- } catch (IOException e) {
- return;
- }
- }
- });
- listenThread.start();
-
- try {
- SyncMetaHeartbeatClient.Factory factoryAsync =
- new SyncMetaHeartbeatClient.Factory(new TBinaryProtocol.Factory());
- SyncMetaHeartbeatClient syncClient = factoryAsync.getSyncClient(node, null);
- Assert.assertEquals(
- "SyncMetaHBClient (ip = localhost, port = 9004, id = 0)", syncClient.toString());
- } finally {
- serverSocket.close();
- listenThread.interrupt();
- listenThread.join();
- }
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java
deleted file mode 100644
index cad2cc6..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncClientFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.common;
-
-import org.apache.iotdb.cluster.client.async.AsyncClientFactory;
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
-import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.protocol.TProtocolFactory;
-
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestAsyncClientFactory extends AsyncClientFactory {
-
- private TProtocolFactory protocolFactory;
- private TAsyncClientManager clientManager;
-
- private AtomicInteger clientSerialNum = new AtomicInteger();
-
- public TestAsyncClientFactory() throws IOException {
- protocolFactory = new Factory();
- clientManager = new TAsyncClientManager();
- }
-
- @Override
- public AsyncClient getAsyncClient(Node node, AsyncClientPool pool) throws IOException {
- return new TestAsyncClient(
- protocolFactory,
- clientManager,
- TNonblockingSocketWrapper.wrap(node.getInternalIp(), node.getMetaPort()),
- clientSerialNum.getAndIncrement());
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java
index e2e7c70..529c5e4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestAsyncMetaClient.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.cluster.common;
-import org.apache.iotdb.cluster.client.async.AsyncClientPool;
+import org.apache.iotdb.cluster.client.ClientCategory;
import org.apache.iotdb.cluster.client.async.AsyncMetaClient;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -33,12 +33,9 @@ public class TestAsyncMetaClient extends AsyncMetaClient {
private Node node;
public TestAsyncMetaClient(
- TProtocolFactory protocolFactory,
- TAsyncClientManager clientManager,
- Node node,
- AsyncClientPool pool)
+ TProtocolFactory protocolFactory, TAsyncClientManager clientManager, Node node)
throws IOException {
- super(protocolFactory, clientManager, node, pool);
+ super(protocolFactory, clientManager, node, ClientCategory.META);
this.node = node;
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java
deleted file mode 100644
index eb15dd9..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/TestSyncClientFactory.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.cluster.common;
-
-import org.apache.iotdb.cluster.client.sync.SyncClientFactory;
-import org.apache.iotdb.cluster.client.sync.SyncClientPool;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-
-import org.apache.thrift.TConfiguration;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class TestSyncClientFactory implements SyncClientFactory {
- private AtomicInteger clientSerialNum = new AtomicInteger();
- private TProtocolFactory protocolFactory = new Factory();
-
- public TestSyncClientFactory() {}
-
- @Override
- public Client getSyncClient(Node node, SyncClientPool pool) {
- TTransport dummyTransport =
- new TTransport() {
- boolean closed = false;
-
- @Override
- public boolean isOpen() {
- return !closed;
- }
-
- @Override
- public void open() {
- closed = false;
- }
-
- @Override
- public void close() {
- closed = true;
- }
-
- @Override
- public int read(byte[] bytes, int i, int i1) {
- return 0;
- }
-
- @Override
- public void write(byte[] bytes, int i, int i1) {
- // do nothing
- }
-
- @Override
- public TConfiguration getConfiguration() {
- return null;
- }
-
- @Override
- public void updateKnownMessageSize(long size) throws TTransportException {}
-
- @Override
- public void checkReadBytesAvailable(long numBytes) throws TTransportException {}
- };
- return new TestSyncClient(
- protocolFactory.getProtocol(dummyTransport),
- protocolFactory.getProtocol(dummyTransport),
- clientSerialNum.getAndIncrement());
- }
-}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
index 4fe7f29..e257c5a 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/applier/DataLogApplierTest.java
@@ -20,13 +20,10 @@
package org.apache.iotdb.cluster.log.applier;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.common.IoTDBTest;
-import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
-import org.apache.iotdb.cluster.common.TestDataGroupMember;
-import org.apache.iotdb.cluster.common.TestMetaGroupMember;
-import org.apache.iotdb.cluster.common.TestUtils;
+import org.apache.iotdb.cluster.common.*;
import org.apache.iotdb.cluster.coordinator.Coordinator;
import org.apache.iotdb.cluster.log.logtypes.CloseFileLog;
import org.apache.iotdb.cluster.log.logtypes.PhysicalPlanLog;
@@ -34,13 +31,8 @@ import org.apache.iotdb.cluster.metadata.CMManager;
import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.partition.slot.SlotPartitionTable;
-import org.apache.iotdb.cluster.rpc.thrift.GetAllPathsResult;
-import org.apache.iotdb.cluster.rpc.thrift.Node;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaRequest;
-import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
-import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.*;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-import org.apache.iotdb.cluster.rpc.thrift.TNodeStatus;
import org.apache.iotdb.cluster.server.NodeCharacter;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.RaftMember;
@@ -79,7 +71,6 @@ import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import junit.framework.TestCase;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -126,14 +117,9 @@ public class DataLogApplierTest extends IoTDBTest {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
try {
- return new TestAsyncMetaClient(null, null, node, null) {
+ return new TestAsyncMetaClient(null, null, node) {
@Override
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
new Thread(
@@ -183,75 +169,96 @@ public class DataLogApplierTest extends IoTDBTest {
partialWriteEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert();
IoTDBDescriptor.getInstance().getConfig().setEnablePartialInsert(false);
isPartitionEnabled = IoTDBDescriptor.getInstance().getConfig().isEnablePartition();
- // //TODO fixme : 恢复正常的provider
+ // TODO fixme: restore normal provider
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
+ @Override
+ public AsyncClient borrowAsyncClient(Node node, ClientCategory category) {
+ try {
+ AsyncDataClient dataClient =
+ new AsyncDataClient(null, null, node, ClientCategory.DATA) {
+ @Override
+ public void getAllPaths(
+ RaftNode header,
+ List<String> path,
+ boolean withAlias,
+ AsyncMethodCallback<GetAllPathsResult> resultHandler) {
+ new Thread(
+ () ->
+ new DataAsyncService(testDataGroupMember)
+ .getAllPaths(header, path, withAlias, resultHandler))
+ .start();
+ }
+
+ @Override
+ public void pullTimeSeriesSchema(
+ PullSchemaRequest request,
+ AsyncMethodCallback<PullSchemaResp> resultHandler) {
+ new Thread(
+ () -> {
+ List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>();
+ for (String path : request.prefixPaths) {
+ if (path.startsWith(TestUtils.getTestSg(4))) {
+ for (int i = 0; i < 10; i++) {
+ timeseriesSchemas.add(
+ TestUtils.getTestTimeSeriesSchema(4, i));
+ }
+ } else if (!path.startsWith(TestUtils.getTestSg(5))) {
+ resultHandler.onError(
+ new StorageGroupNotSetException(path));
+ return;
+ }
+ }
+ PullSchemaResp resp = new PullSchemaResp();
+ // serialize the schemas
+ ByteArrayOutputStream byteArrayOutputStream =
+ new ByteArrayOutputStream();
+ DataOutputStream dataOutputStream =
+ new DataOutputStream(byteArrayOutputStream);
+ try {
+ dataOutputStream.writeInt(timeseriesSchemas.size());
+ for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
+ timeseriesSchema.serializeTo(dataOutputStream);
+ }
+ } catch (IOException ignored) {
+ // unreachable for we are using a ByteArrayOutputStream
+ }
+ resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
+ resultHandler.onComplete(resp);
+ })
+ .start();
+ }
+
+ @Override
+ public void pullMeasurementSchema(
+ PullSchemaRequest request,
+ AsyncMethodCallback<PullSchemaResp> resultHandler) {
+ new Thread(
+ () ->
+ new DataAsyncService(testDataGroupMember)
+ .pullMeasurementSchema(request, resultHandler))
+ .start();
+ }
+ };
+ return dataClient;
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- return new AsyncDataClient(null, null, node, null) {
- @Override
- public void getAllPaths(
- RaftNode header,
- List<String> path,
- boolean withAlias,
- AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- new Thread(
- () ->
- new DataAsyncService(testDataGroupMember)
- .getAllPaths(header, path, withAlias, resultHandler))
- .start();
- }
-
- @Override
- public void pullTimeSeriesSchema(
- PullSchemaRequest request,
- AsyncMethodCallback<PullSchemaResp> resultHandler) {
- new Thread(
- () -> {
- List<TimeseriesSchema> timeseriesSchemas = new ArrayList<>();
- for (String path : request.prefixPaths) {
- if (path.startsWith(TestUtils.getTestSg(4))) {
- for (int i = 0; i < 10; i++) {
- timeseriesSchemas.add(TestUtils.getTestTimeSeriesSchema(4, i));
- }
- } else if (!path.startsWith(TestUtils.getTestSg(5))) {
- resultHandler.onError(new StorageGroupNotSetException(path));
- return;
- }
- }
- PullSchemaResp resp = new PullSchemaResp();
- // serialize the schemas
- ByteArrayOutputStream byteArrayOutputStream =
- new ByteArrayOutputStream();
- DataOutputStream dataOutputStream =
- new DataOutputStream(byteArrayOutputStream);
- try {
- dataOutputStream.writeInt(timeseriesSchemas.size());
- for (TimeseriesSchema timeseriesSchema : timeseriesSchemas) {
- timeseriesSchema.serializeTo(dataOutputStream);
- }
- } catch (IOException ignored) {
- // unreachable for we are using a ByteArrayOutputStream
- }
- resp.setSchemaBytes(byteArrayOutputStream.toByteArray());
- resultHandler.onComplete(resp);
- })
- .start();
- }
-
- @Override
- public void pullMeasurementSchema(
- PullSchemaRequest request,
- AsyncMethodCallback<PullSchemaResp> resultHandler) {
- new Thread(
- () ->
- new DataAsyncService(testDataGroupMember)
- .pullMeasurementSchema(request, resultHandler))
- .start();
- }
- };
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
}
+
+ @Override
+ public void returnAsyncClient(
+ AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
((CMManager) IoTDB.metaManager).setMetaGroupMember(testMetaGroupMember);
testDataGroupMember.setMetaGroupMember(testMetaGroupMember);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
index 9707410..9517199 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/CatchUpTaskTest.java
@@ -99,11 +99,6 @@ public class CatchUpTaskTest {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
return new TestAsyncClient() {
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
index 0252720..3fb5d30 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/LogCatchUpTaskTest.java
@@ -66,11 +66,6 @@ public class LogCatchUpTaskTest {
new TestMetaGroupMember() {
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
return new TestAsyncClient() {
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
index e9b3dc4..008c235 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/catchup/SnapshotCatchUpTaskTest.java
@@ -67,11 +67,6 @@ public class SnapshotCatchUpTaskTest {
new TestMetaGroupMember() {
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
if (noConnection) {
return null;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
index 8cbba73..a181792 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/DataSnapshotTest.java
@@ -72,11 +72,6 @@ public abstract class DataSnapshotTest {
dataGroupMember =
new TestDataGroupMember() {
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
return new AsyncDataClient(null, null, null) {
@Override
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
index 2dbe9c1..51df0e5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/log/snapshot/PullSnapshotTaskTest.java
@@ -96,11 +96,6 @@ public class PullSnapshotTaskTest extends DataSnapshotTest {
sourceMember =
new TestDataGroupMember() {
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
try {
return new TestAsyncDataClient(node, null) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
index 05ffe75..23752a7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/manage/QueryCoordinatorTest.java
@@ -73,14 +73,9 @@ public class QueryCoordinatorTest {
MetaGroupMember metaGroupMember =
new MetaGroupMember() {
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
try {
- return new TestAsyncMetaClient(new Factory(), null, node, null) {
+ return new TestAsyncMetaClient(new Factory(), null, node) {
@Override
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
new Thread(
@@ -90,6 +85,7 @@ public class QueryCoordinatorTest {
} catch (InterruptedException e) {
// ignored
}
+
resultHandler.onComplete(nodeStatusMap.get(getNode()).getStatus());
})
.start();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
index 451e78b..3822976 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/DatasourceInfoTest.java
@@ -20,13 +20,15 @@
package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -35,7 +37,6 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.Before;
import org.junit.Test;
@@ -50,10 +51,11 @@ public class DatasourceInfoTest {
public void setUp() {
metaGroupMember = new TestMetaGroupMember();
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
return new AsyncDataClient(null, null, TestUtils.getNode(0), null) {
@Override
public void querySingleSeries(
@@ -63,6 +65,19 @@ public class DatasourceInfoTest {
}
};
}
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
index de4d588..bccfab5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSeriesReaderByTimestampTest.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -28,6 +29,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -38,7 +40,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -65,10 +66,11 @@ public class RemoteSeriesReaderByTimestampTest {
prevUseAsyncServer = ClusterDescriptor.getInstance().getConfig().isUseAsyncServer();
ClusterDescriptor.getInstance().getConfig().setUseAsyncServer(true);
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
public void fetchSingleSeriesByTimestamps(
@@ -110,19 +112,21 @@ public class RemoteSeriesReaderByTimestampTest {
})
.start();
}
-
- @Override
- public void querySingleSeriesByTimestamp(
- SingleSeriesQueryRequest request, AsyncMethodCallback<Long> resultHandler)
- throws TException {
- if (failedNodes.contains(node)) {
- throw new TException("Node down.");
- }
-
- new Thread(() -> resultHandler.onComplete(1L)).start();
- }
};
}
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
index d3e6eca..a99beb5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/RemoteSimpleSeriesReaderTest.java
@@ -20,7 +20,8 @@
package org.apache.iotdb.cluster.query.reader;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
@@ -29,6 +30,7 @@ import org.apache.iotdb.cluster.partition.PartitionGroup;
import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
@@ -40,7 +42,6 @@ import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -72,12 +73,13 @@ public class RemoteSimpleSeriesReaderTest {
batchData = TestUtils.genBatchData(TSDataType.DOUBLE, 0, 100);
batchUsed = false;
metaGroupMember = new TestMetaGroupMember();
- // TODO fixme : 恢复正常的provider
+ // TODO fixme : restore normal provider
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
public void fetchSingleSeries(
@@ -117,6 +119,19 @@ public class RemoteSimpleSeriesReaderTest {
}
};
}
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
index 31a846f..2533db1 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/AssignPathManagedMergeReaderTest.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.cluster.query.reader.mult;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
import org.apache.iotdb.cluster.common.TestUtils;
@@ -29,6 +30,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -43,7 +45,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -130,10 +131,11 @@ public class AssignPathManagedMergeReaderTest {
private void setAsyncDataClient() {
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
public void fetchMultSeries(
@@ -180,6 +182,19 @@ public class AssignPathManagedMergeReaderTest {
}
};
}
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
index 784e203..8240cd8 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/mult/RemoteMultSeriesReaderTest.java
@@ -19,7 +19,8 @@
package org.apache.iotdb.cluster.query.reader.mult;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
+import org.apache.iotdb.cluster.client.ClientCategory;
+import org.apache.iotdb.cluster.client.IClientManager;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.common.TestMetaGroupMember;
@@ -30,6 +31,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.cluster.rpc.thrift.MultSeriesQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
+import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -44,7 +46,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -183,10 +184,11 @@ public class RemoteMultSeriesReaderTest {
private void setAsyncDataClient() {
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
return new AsyncDataClient(null, null, node, null) {
@Override
public void fetchMultSeries(
@@ -233,15 +235,34 @@ public class RemoteMultSeriesReaderTest {
}
};
}
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
+ return null;
+ }
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
private void setSyncDataClient() {
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
+ .setClientManager(
+ new IClientManager() {
@Override
- public SyncDataClient getSyncDataClient(Node node, int timeout) {
+ public RaftService.AsyncClient borrowAsyncClient(Node node, ClientCategory category)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public RaftService.Client borrowSyncClient(Node node, ClientCategory category) {
return new SyncDataClient(null) {
@Override
public Map<String, ByteBuffer> fetchMultSeries(
@@ -275,6 +296,14 @@ public class RemoteMultSeriesReaderTest {
}
};
}
+
+ @Override
+ public void returnAsyncClient(
+ RaftService.AsyncClient client, Node node, ClientCategory category) {}
+
+ @Override
+ public void returnSyncClient(
+ RaftService.Client client, Node node, ClientCategory category) {}
});
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
index 95c5347..adb8a54 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/DataHeartbeatThreadTest.java
@@ -69,11 +69,6 @@ public class DataHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getClient(node);
- }
-
- @Override
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getClient(node);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
index 512e67b..396e8fa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatThreadTest.java
@@ -83,11 +83,6 @@ public class HeartbeatThreadTest {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getClient(node);
- }
-
- @Override
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getClient(node);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
index 548ead5..63b4271 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/heartbeat/MetaHeartbeatThreadTest.java
@@ -144,11 +144,6 @@ public class MetaHeartbeatThreadTest extends HeartbeatThreadTest {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getClient(node);
- }
-
- @Override
public AsyncClient getAsyncHeartbeatClient(Node node) {
return getClient(node);
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
index aa3e0f6..9814436 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/BaseMember.java
@@ -20,8 +20,7 @@
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.ClientManager;
import org.apache.iotdb.cluster.common.TestAsyncDataClient;
import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
import org.apache.iotdb.cluster.common.TestDataGroupMember;
@@ -59,7 +58,6 @@ import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
@@ -236,15 +234,6 @@ public class BaseMember {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- try {
- return new TestAsyncDataClient(node, dataGroupMemberMap);
- } catch (IOException e) {
- return null;
- }
- }
-
- @Override
public AsyncClient getSendLogAsyncClient(Node node) {
return getAsyncClient(node);
}
@@ -294,7 +283,7 @@ public class BaseMember {
@Override
public AsyncClient getAsyncClient(Node node) {
try {
- return new TestAsyncMetaClient(null, null, node, null) {
+ return new TestAsyncMetaClient(null, null, node) {
@Override
public void queryNodeStatus(AsyncMethodCallback<TNodeStatus> resultHandler) {
new Thread(() -> resultHandler.onComplete(new TNodeStatus())).start();
@@ -318,15 +307,12 @@ public class BaseMember {
ret.setLeader(node);
ret.setCharacter(NodeCharacter.LEADER);
ret.setAppendLogThreadPool(testThreadPool);
- // TODO fixme : 恢复正常的provider
+ // TODO fixme : restore normal provider
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new Factory()) {
- @Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- return new TestAsyncDataClient(node, dataGroupMemberMap);
- }
- });
+ .setClientManager(
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient));
return ret;
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index a1abb8e..c947253 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -205,11 +205,6 @@ public class DataGroupMemberTest extends BaseMember {
}
@Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getAsyncClient(node);
- }
-
- @Override
public AsyncClient getAsyncClient(Node node) {
try {
return new TestAsyncDataClient(node, dataGroupMemberMap) {
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 33aee68..54ee250 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -20,10 +20,8 @@
package org.apache.iotdb.cluster.server.member;
import org.apache.iotdb.cluster.ClusterIoTDB;
-import org.apache.iotdb.cluster.client.DataClientProvider;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient;
+import org.apache.iotdb.cluster.client.ClientManager;
import org.apache.iotdb.cluster.common.TestAsyncClient;
-import org.apache.iotdb.cluster.common.TestAsyncDataClient;
import org.apache.iotdb.cluster.common.TestAsyncMetaClient;
import org.apache.iotdb.cluster.common.TestPartitionedLogManager;
import org.apache.iotdb.cluster.common.TestSnapshot;
@@ -110,7 +108,6 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
import org.apache.thrift.async.AsyncMethodCallback;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol.Factory;
import org.junit.After;
import org.junit.Assert;
@@ -389,14 +386,9 @@ public class MetaGroupMemberTest extends BaseMember {
return getClient(node);
}
- @Override
- public AsyncClient getAsyncClient(Node node, boolean activatedOnly) {
- return getClient(node);
- }
-
AsyncClient getClient(Node node) {
try {
- return new TestAsyncMetaClient(null, null, node, null) {
+ return new TestAsyncMetaClient(null, null, node) {
@Override
public void startElection(
ElectionRequest request, AsyncMethodCallback<Long> resultHandler) {
@@ -543,15 +535,12 @@ public class MetaGroupMemberTest extends BaseMember {
metaGroupMember.setAllNodes(allNodes);
metaGroupMember.setCharacter(NodeCharacter.LEADER);
metaGroupMember.setAppendLogThreadPool(testThreadPool);
- // TODO fixme : 恢复正常的provider
+ // TODO fixme : restore normal provider
ClusterIoTDB.getInstance()
- .setClientProvider(
- new DataClientProvider(new TBinaryProtocol.Factory()) {
- @Override
- public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- return new TestAsyncDataClient(node, dataGroupMemberMap);
- }
- });
+ .setClientManager(
+ new ClientManager(
+ ClusterDescriptor.getInstance().getConfig().isUseAsyncServer(),
+ ClientManager.Type.RequestForwardClient));
return metaGroupMember;
}
diff --git a/pom.xml b/pom.xml
index 0bd5721..9fe6245 100644
--- a/pom.xml
+++ b/pom.xml
@@ -132,6 +132,7 @@
<common.collections.version>3.2.2</common.collections.version>
<common.lang3.version>3.8.1</common.lang3.version>
<common.logging.version>1.1.3</common.logging.version>
+ <common.pool2.version>2.11.1</common.pool2.version>
<org.slf4j.version>1.7.30</org.slf4j.version>
<guava.version>24.1.1</guava.version>
<jline.version>2.14.5</jline.version>
@@ -332,6 +333,11 @@
<version>${common.lang3.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>${common.pool2.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop2.version}</version>