You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/12/26 11:57:25 UTC
[iotdb] branch master updated: [IOTDB-5260] Refactor ClientManager API and Exception (#8561)
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new daab49b61a [IOTDB-5260] Refactor ClientManager API and Exception (#8561)
daab49b61a is described below
commit daab49b61ac20b7c9d83ffc1d6003816abc324aa
Author: Potato <ta...@apache.org>
AuthorDate: Mon Dec 26 19:57:19 2022 +0800
[IOTDB-5260] Refactor ClientManager API and Exception (#8561)
Refactor ClientManager API and Exception
* fix ci
* update javadoc format
* catch throwable for sync client
* fix code smell
* fix review
Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
.../async/AsyncConfigNodeHeartbeatClientPool.java | 6 +-
.../client/async/AsyncDataNodeClientPool.java | 5 +-
.../async/AsyncDataNodeHeartbeatClientPool.java | 6 +-
.../client/sync/SyncConfigNodeClientPool.java | 6 +-
.../client/sync/SyncDataNodeClientPool.java | 8 +--
.../consensus/iot/IoTConsensusServerImpl.java | 17 ++---
.../consensus/iot/logdispatcher/LogDispatcher.java | 3 +-
.../iotdb/consensus/ratis/RatisConsensus.java | 21 +++---
integration-test/import-control.xml | 1 +
.../java/org/apache/iotdb/it/env/AbstractEnv.java | 28 ++++----
.../org/apache/iotdb/it/env/RemoteServerEnv.java | 34 +++++-----
.../java/org/apache/iotdb/itbase/env/BaseEnv.java | 3 +-
.../confignode/it/IoTDBConfigNodeSnapshotIT.java | 3 +-
.../iotdb/confignode/it/IoTDBStorageGroupIT.java | 8 +--
.../it/cluster/IoTDBClusterNodeErrorStartUpIT.java | 7 +-
.../it/cluster/IoTDBClusterNodeGetterIT.java | 8 +--
.../load/IoTDBClusterRegionLeaderBalancingIT.java | 6 +-
.../it/load/IoTDBConfigNodeSwitchLeaderIT.java | 5 +-
.../partition/IoTDBAutoRegionGroupExtensionIT.java | 4 +-
.../IoTDBCustomRegionGroupExtensionIT.java | 6 +-
.../it/partition/IoTDBPartitionDurableIT.java | 12 ++--
.../it/partition/IoTDBPartitionGetterIT.java | 17 ++---
.../partition/IoTDBPartitionInheritPolicyIT.java | 5 +-
.../commons/client/ClientFactoryProperty.java | 7 +-
.../apache/iotdb/commons/client/ClientManager.java | 78 +++++++++-------------
.../iotdb/commons/client/ClientPoolProperty.java | 18 +++--
.../iotdb/commons/client/IClientManager.java | 18 ++---
.../iotdb/commons/client/IClientPoolFactory.java | 7 +-
.../ClientManagerException.java} | 12 ++--
.../iotdb/commons/service/ThriftServiceThread.java | 2 +-
.../iotdb/commons/client/ClientManagerTest.java | 22 +++---
.../iotdb/db/auth/ClusterAuthorityFetcher.java | 10 +--
.../metadata/template/ClusterTemplateManager.java | 6 +-
.../db/mpp/execution/exchange/SinkHandle.java | 4 +-
.../apache/iotdb/db/mpp/plan/TestRPCClient.java | 13 ++--
.../iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java | 3 +-
.../mpp/plan/analyze/ClusterPartitionFetcher.java | 17 +++--
.../db/mpp/plan/analyze/cache/PartitionCache.java | 11 +--
.../config/executor/ClusterConfigTaskExecutor.java | 57 ++++++++--------
.../scheduler/AbstractFragInsStateTracker.java | 4 +-
.../scheduler/FixedRateFragInsStateTracker.java | 4 +-
.../scheduler/FragmentInstanceDispatcherImpl.java | 4 +-
.../mpp/plan/scheduler/SimpleQueryTerminator.java | 7 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 6 +-
.../db/trigger/executor/TriggerFireVisitor.java | 10 ++-
.../db/mpp/execution/exchange/SinkHandleTest.java | 9 +--
.../mpp/execution/exchange/SourceHandleTest.java | 12 ++--
47 files changed, 267 insertions(+), 293 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
index 8dd67da315..3c2e3072f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncConfigNodeHeartbeatClientPool.java
@@ -42,12 +42,8 @@ public class AsyncConfigNodeHeartbeatClientPool {
*/
public void getConfigNodeHeartBeat(
TEndPoint endPoint, long timestamp, ConfigNodeHeartbeatHandler handler) {
- AsyncConfigNodeHeartbeatServiceClient client;
try {
- client = clientManager.purelyBorrowClient(endPoint);
- if (client != null) {
- client.getConfigNodeHeartBeat(timestamp, handler);
- }
+ clientManager.borrowClient(endPoint).getConfigNodeHeartBeat(timestamp, handler);
} catch (Exception ignore) {
// Just ignore
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
index 3973689b3b..7f0404cc69 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeClientPool.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.async.handlers.rpc.AsyncTSStatusRPCHandler;
@@ -60,8 +61,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
/** Asynchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
public class AsyncDataNodeClientPool {
@@ -350,7 +349,7 @@ public class AsyncDataNodeClientPool {
}
public AsyncDataNodeInternalServiceClient getAsyncClient(TDataNodeLocation targetDataNode)
- throws IOException {
+ throws ClientManagerException {
return clientManager.borrowClient(targetDataNode.getInternalEndPoint());
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
index a42616c722..1a92dd6ac9 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/async/AsyncDataNodeHeartbeatClientPool.java
@@ -44,12 +44,8 @@ public class AsyncDataNodeHeartbeatClientPool {
*/
public void getDataNodeHeartBeat(
TEndPoint endPoint, THeartbeatReq req, DataNodeHeartbeatHandler handler) {
- AsyncDataNodeHeartbeatServiceClient client;
try {
- client = clientManager.purelyBorrowClient(endPoint);
- if (client != null) {
- client.getDataNodeHeartBeat(req, handler);
- }
+ clientManager.borrowClient(endPoint).getDataNodeHeartBeat(req, handler);
} catch (Exception ignore) {
// Just ignore
}
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
index cb8e62b595..0d0f5336f3 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncConfigNodeClientPool.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.client.ConfigNodeRequestType;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
@@ -35,7 +36,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
/** Synchronously send RPC requests to ConfigNode. See confignode.thrift for more details. */
@@ -93,7 +93,7 @@ public class SyncConfigNodeClientPool {
return RpcUtils.getStatus(
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Unknown request type: " + requestType);
}
- } catch (Throwable e) {
+ } catch (Exception e) {
lastException = e;
LOGGER.warn(
"{} failed on ConfigNode {}, because {}, retrying {}...",
@@ -118,7 +118,7 @@ public class SyncConfigNodeClientPool {
*/
public TSStatus removeConfigNode(
TConfigNodeLocation configNodeLocation, SyncConfigNodeIServiceClient client)
- throws TException, IOException, InterruptedException {
+ throws ClientManagerException, TException, InterruptedException {
TSStatus status = client.removeConfigNode(configNodeLocation);
while (status.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
TimeUnit.MILLISECONDS.sleep(2000);
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 3de327ecbb..3136e05da1 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
@@ -42,7 +43,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
/** Synchronously send RPC requests to DataNodes. See mpp.thrift for more details. */
@@ -67,7 +67,7 @@ public class SyncDataNodeClientPool {
for (int retry = 0; retry < DEFAULT_RETRY_NUM; retry++) {
try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
return executeSyncRequest(requestType, client, req);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
lastException = e;
if (retry != DEFAULT_RETRY_NUM - 1) {
LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1);
@@ -86,7 +86,7 @@ public class SyncDataNodeClientPool {
for (int retry = 0; retry < retryNum; retry++) {
try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(endPoint)) {
return executeSyncRequest(requestType, client, req);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
lastException = e;
if (retry != retryNum - 1) {
LOGGER.warn("{} failed on DataNode {}, retrying {}...", requestType, endPoint, retry + 1);
@@ -167,7 +167,7 @@ public class SyncDataNodeClientPool {
try (SyncDataNodeInternalServiceClient client = clientManager.borrowClient(dataNode)) {
TRegionLeaderChangeReq req = new TRegionLeaderChangeReq(regionId, newLeaderNode);
status = client.changeRegionLeader(req);
- } catch (IOException e) {
+ } catch (ClientManagerException e) {
LOGGER.error("Can't connect to Data node: {}", dataNode, e);
status = new TSStatus(TSStatusCode.CAN_NOT_CONNECT_DATANODE.getStatusCode());
status.setMessage(e.getMessage());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index d66d29cc6e..1d668c281a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
@@ -332,7 +333,7 @@ public class IoTConsensusServerImpl {
reader.close();
}
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when send snapshot file to %s", targetPeer), e);
}
@@ -403,7 +404,7 @@ public class IoTConsensusServerImpl {
throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s. %s", peer, res.getStatus()));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when inactivating %s", peer), e);
}
@@ -420,7 +421,7 @@ public class IoTConsensusServerImpl {
throw new ConsensusGroupModifyPeerException(
String.format("error when triggering snapshot load %s. %s", peer, res.getStatus()));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s", peer), e);
}
@@ -435,7 +436,7 @@ public class IoTConsensusServerImpl {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s. %s", peer, res.getStatus()));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when activating %s", peer), e);
}
@@ -470,7 +471,7 @@ public class IoTConsensusServerImpl {
throw new ConsensusGroupModifyPeerException(
String.format("build sync log channel failed from %s to %s", peer, targetPeer));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
// We use a simple way to deal with the connection issue when notifying other nodes to
// build sync log. If the un-responsible peer is the peer which will be removed, we cannot
// suspend the operation and need to skip it. In order to keep the mechanism works fine,
@@ -513,7 +514,7 @@ public class IoTConsensusServerImpl {
throw new ConsensusGroupModifyPeerException(
String.format("remove sync log channel failed from %s to %s", peer, targetPeer));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("error when removing sync log channel to %s", peer), e);
}
@@ -545,7 +546,7 @@ public class IoTConsensusServerImpl {
res.safeIndex);
Thread.sleep(checkIntervalInMs);
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
throw new ConsensusGroupModifyPeerException(
String.format(
"error when waiting %s to complete SyncLog. %s", targetPeer, e.getMessage()),
@@ -750,7 +751,7 @@ public class IoTConsensusServerImpl {
String.format(
"cleanup remote snapshot failed of %s ,status is %s", targetPeer, res.getStatus()));
}
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new ConsensusGroupModifyPeerException(
String.format("cleanup remote snapshot failed of %s", targetPeer), e);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index fecccc1403..752a392ed1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -37,7 +37,6 @@ import org.apache.iotdb.consensus.iot.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.iot.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.metrics.utils.MetricLevel;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -450,7 +449,7 @@ public class LogDispatcher {
batch.getEndIndex(),
peer.getGroupId().convertToTConsensusGroupId());
client.syncLogEntries(req, handler);
- } catch (IOException | TException e) {
+ } catch (Exception e) {
logger.error("Can not sync logs to peer {} because", peer, e);
handler.onError(e);
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 151bf17bb4..e00374c347 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -73,7 +74,6 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.function.CheckedSupplier;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,7 +243,7 @@ class RatisConsensus implements IConsensus {
if (isLeader(consensusGroupId) && CommonDescriptor.getInstance().getConfig().isReadOnly()) {
try {
forceStepDownLeader(raftGroup);
- } catch (IOException e) {
+ } catch (Exception e) {
logger.warn("leader {} read only, force step down failed due to {}", myself, e);
}
return failedWrite(new NodeReadOnlyException(myself));
@@ -284,7 +284,7 @@ class RatisConsensus implements IConsensus {
return failedWrite(new RatisRequestFailedException(reply.getException()));
}
writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
- } catch (IOException | TException e) {
+ } catch (Exception e) {
return failedWrite(new RatisRequestFailedException(e));
} finally {
if (client != null) {
@@ -357,7 +357,7 @@ class RatisConsensus implements IConsensus {
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
}
- } catch (IOException e) {
+ } catch (Exception e) {
return failed(new RatisRequestFailedException(e));
} finally {
if (client != null) {
@@ -550,7 +550,7 @@ class RatisConsensus implements IConsensus {
if (!reply.isSuccess()) {
return failed(new RatisRequestFailedException(reply.getException()));
}
- } catch (IOException e) {
+ } catch (Exception e) {
return failed(new RatisRequestFailedException(e));
} finally {
if (client != null) {
@@ -560,13 +560,14 @@ class RatisConsensus implements IConsensus {
return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
}
- private void forceStepDownLeader(RaftGroup group) throws IOException {
+ private void forceStepDownLeader(RaftGroup group) throws ClientManagerException, IOException {
// when newLeaderPeerId == null, ratis forces current leader to step down and raise new
// election
transferLeader(group, null);
}
- private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader) throws IOException {
+ private RaftClientReply transferLeader(RaftGroup group, RaftPeer newLeader)
+ throws ClientManagerException, IOException {
RatisClient client = null;
try {
client = getRaftClient(group);
@@ -770,10 +771,10 @@ class RatisConsensus implements IConsensus {
Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
}
- private RatisClient getRaftClient(RaftGroup group) throws IOException {
+ private RatisClient getRaftClient(RaftGroup group) throws ClientManagerException {
try {
return clientManager.borrowClient(group);
- } catch (IOException e) {
+ } catch (ClientManagerException e) {
logger.error(String.format("Borrow client from pool for group %s failed.", group), e);
// rethrow the exception
throw e;
@@ -792,7 +793,7 @@ class RatisConsensus implements IConsensus {
if (!reply.isSuccess()) {
throw new RatisRequestFailedException(reply.getException());
}
- } catch (IOException e) {
+ } catch (Exception e) {
throw new RatisRequestFailedException(e);
} finally {
if (client != null) {
diff --git a/integration-test/import-control.xml b/integration-test/import-control.xml
index 829fc662f2..0465588691 100644
--- a/integration-test/import-control.xml
+++ b/integration-test/import-control.xml
@@ -73,6 +73,7 @@
<allow class="org.apache.iotdb.commons.cq.CQState" />
<allow class="org.apache.iotdb.consensus.ConsensusFactory" />
<allow class="org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils" />
+ <allow class="org.apache.iotdb.commons.client.exception.ClientManagerException" />
<allow pkg="org\.apache\.iotdb\.common\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.confignode\.rpc\.thrift.*" regex="true" />
<allow pkg="org\.apache\.iotdb\.commons\.client\.sync.*" regex="true" />
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
index 72e92ccd50..98ae650290 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/AbstractEnv.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.it.env;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
@@ -75,10 +76,17 @@ public abstract class AbstractEnv implements BaseEnv {
public static final String templateNodePath =
System.getProperty("user.dir") + File.separator + "target" + File.separator + "template-node";
+ private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
+
protected void initEnvironment(int configNodesNum, int dataNodesNum) {
this.configNodeWrapperList = new ArrayList<>();
this.dataNodeWrapperList = new ArrayList<>();
+ clientManager =
+ new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+
final String testClassName = getTestClassName();
final String testMethodName = getTestMethodName();
@@ -167,6 +175,7 @@ public abstract class AbstractEnv implements BaseEnv {
logger.error("Delete lock file {} failed", lockPath);
}
}
+ clientManager.close();
testMethodName = null;
}
@@ -435,15 +444,12 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection()
throws IOException, InterruptedException {
- IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
- new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
+
for (int i = 0; i < 30; i++) {
for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
try {
SyncConfigNodeIServiceClient client =
- clientManager.purelyBorrowClient(
+ clientManager.borrowClient(
new TEndPoint(configNodeWrapper.getIp(), configNodeWrapper.getPort()));
TShowClusterResp resp = client.showCluster();
@@ -460,8 +466,10 @@ public abstract class AbstractEnv implements BaseEnv {
}
} catch (Exception e) {
logger.error(
- "Borrow ConfigNodeClient from ConfigNode: {} failed, retrying...",
- configNodeWrapper.getIpAndPortString());
+ String.format(
+ "Borrow ConfigNodeClient from ConfigNode: %s failed, retrying...",
+ configNodeWrapper.getIpAndPortString()),
+ e);
}
// Sleep 1s before next retry
@@ -473,10 +481,6 @@ public abstract class AbstractEnv implements BaseEnv {
@Override
public int getLeaderConfigNodeIndex() throws IOException, InterruptedException {
- IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
- new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
for (int retry = 0; retry < 30; retry++) {
for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); configNodeId++) {
ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(configNodeId);
@@ -489,7 +493,7 @@ public abstract class AbstractEnv implements BaseEnv {
if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return configNodeId;
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.error(
"Borrow ConfigNodeClient from ConfigNode: {} failed because: {}, retrying...",
configNodeWrapper.getIp(),
diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
index ff9595cead..0187623e7d 100644
--- a/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/it/env/RemoteServerEnv.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.it.env;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
@@ -41,10 +42,12 @@ import static org.apache.iotdb.jdbc.Config.VERSION;
import static org.junit.Assert.fail;
public class RemoteServerEnv implements BaseEnv {
- private String ip_addr = System.getProperty("RemoteIp", "127.0.0.1");
- private String port = System.getProperty("RemotePort", "6667");
- private String user = System.getProperty("RemoteUser", "root");
- private String password = System.getProperty("RemotePassword", "root");
+
+ private final String ip_addr = System.getProperty("RemoteIp", "127.0.0.1");
+ private final String port = System.getProperty("RemotePort", "6667");
+ private final String user = System.getProperty("RemoteUser", "root");
+ private final String password = System.getProperty("RemotePassword", "root");
+ private IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager;
@Override
public void initBeforeClass() {
@@ -56,6 +59,10 @@ public class RemoteServerEnv implements BaseEnv {
e.printStackTrace();
fail(e.getMessage());
}
+ clientManager =
+ new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
+ .createClientManager(
+ new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
}
@Override
@@ -64,7 +71,9 @@ public class RemoteServerEnv implements BaseEnv {
}
@Override
- public void cleanAfterClass() {}
+ public void cleanAfterClass() {
+ clientManager.close();
+ }
@Override
public void initBeforeTest() {
@@ -79,7 +88,9 @@ public class RemoteServerEnv implements BaseEnv {
}
@Override
- public void cleanAfterTest() {}
+ public void cleanAfterTest() {
+ clientManager.close();
+ }
@Override
public Connection getConnection(String username, String password) throws SQLException {
@@ -151,15 +162,8 @@ public class RemoteServerEnv implements BaseEnv {
}
@Override
- public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws IOException {
- IClientManager<TEndPoint, SyncConfigNodeIServiceClient> clientManager =
- new IClientManager.Factory<TEndPoint, SyncConfigNodeIServiceClient>()
- .createClientManager(
- new DataNodeClientPoolFactory.SyncConfigNodeIServiceClientPoolFactory());
- try (SyncConfigNodeIServiceClient client =
- clientManager.borrowClient(new TEndPoint(ip_addr, 22277))) {
- return client;
- }
+ public IConfigNodeRPCService.Iface getLeaderConfigNodeConnection() throws ClientManagerException {
+ return clientManager.borrowClient(new TEndPoint(ip_addr, 22277));
}
@Override
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
index 0d88c66842..efaf71cd7c 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.itbase.env;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionConfig;
@@ -73,7 +74,7 @@ public interface BaseEnv {
void setDataNodeWrapperList(List<DataNodeWrapper> dataNodeWrapperList);
IConfigNodeRPCService.Iface getLeaderConfigNodeConnection()
- throws IOException, InterruptedException;
+ throws ClientManagerException, IOException, InterruptedException;
default ISession getSessionConnection() throws IoTDBConnectionException {
return getSessionConnection(
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
index dbff815988..c9d30c6668 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBConfigNodeSnapshotIT.java
@@ -118,8 +118,7 @@ public class IoTDBConfigNodeSnapshotIT {
}
@Test
- public void testPartitionInfoSnapshot()
- throws IOException, IllegalPathException, TException, InterruptedException {
+ public void testPartitionInfoSnapshot() throws Exception {
final String sg = "root.sg";
final int storageGroupNum = 10;
final int seriesPartitionSlotsNum = 10;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
index 0010a56420..b305c07ae7 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBStorageGroupIT.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.confignode.it;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
@@ -36,7 +35,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -44,7 +42,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -65,8 +62,7 @@ public class IoTDBStorageGroupIT {
}
@Test
- public void testSetAndQueryStorageGroup()
- throws IOException, TException, IllegalPathException, InterruptedException {
+ public void testSetAndQueryStorageGroup() throws Exception {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -156,7 +152,7 @@ public class IoTDBStorageGroupIT {
}
@Test
- public void testDeleteStorageGroup() throws TException, IOException, InterruptedException {
+ public void testDeleteStorageGroup() throws Exception {
TSStatus status;
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
index 2c924c9589..65b8bedad1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeErrorStartUpIT.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.it.cluster;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
@@ -86,7 +87,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
}
@Test
- public void testConflictNodeRegistration() throws IOException, InterruptedException, TException {
+ public void testConflictNodeRegistration()
+ throws ClientManagerException, InterruptedException, TException, IOException {
/* Test ConfigNode conflict register */
// Construct a ConfigNodeWrapper that conflicts in consensus port with an existed one.
@@ -142,7 +144,8 @@ public class IoTDBClusterNodeErrorStartUpIT {
}
@Test
- public void testIllegalNodeRestart() throws IOException, InterruptedException, TException {
+ public void testIllegalNodeRestart()
+ throws ClientManagerException, IOException, InterruptedException, TException {
ConfigNodeWrapper registeredConfigNodeWrapper = EnvFactory.getEnv().getConfigNodeWrapper(1);
DataNodeWrapper registeredDataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
index dcaf66018b..79bbd29c38 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/cluster/IoTDBClusterNodeGetterIT.java
@@ -42,7 +42,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -50,7 +49,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -100,7 +98,7 @@ public class IoTDBClusterNodeGetterIT {
}
@Test
- public void showClusterAndNodesTest() throws IOException, InterruptedException, TException {
+ public void showClusterAndNodesTest() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -181,7 +179,7 @@ public class IoTDBClusterNodeGetterIT {
}
@Test
- public void removeAndStopConfigNodeTest() throws TException, IOException, InterruptedException {
+ public void removeAndStopConfigNodeTest() throws Exception {
TShowClusterResp showClusterResp;
TSStatus status;
@@ -217,7 +215,7 @@ public class IoTDBClusterNodeGetterIT {
}
@Test
- public void queryAndRemoveDataNodeTest() throws TException, IOException, InterruptedException {
+ public void queryAndRemoveDataNodeTest() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
index 18f1d2f868..faf6a6ccdb 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBClusterRegionLeaderBalancingIT.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.env.BaseConfig;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -48,7 +47,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -112,7 +110,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
@Test
- public void testGreedyLeaderDistribution() throws IOException, InterruptedException, TException {
+ public void testGreedyLeaderDistribution() throws Exception {
final int testConfigNodeNum = 1;
final int testDataNodeNum = 3;
EnvFactory.getEnv().initClusterEnvironment(testConfigNodeNum, testDataNodeNum);
@@ -165,7 +163,7 @@ public class IoTDBClusterRegionLeaderBalancingIT {
}
@Test
- public void testMCFLeaderDistribution() throws IOException, InterruptedException, TException {
+ public void testMCFLeaderDistribution() throws Exception {
final int testConfigNodeNum = 1;
final int testDataNodeNum = 3;
final int retryNum = 50;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
index 8205a3e5a6..81abd2087c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/load/IoTDBConfigNodeSwitchLeaderIT.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -38,7 +37,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -118,8 +116,7 @@ public class IoTDBConfigNodeSwitchLeaderIT {
}
@Test
- public void basicDataInheritIT()
- throws IOException, TException, IllegalPathException, InterruptedException {
+ public void basicDataInheritIT() throws Exception {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
final String d00 = sg0 + ".d0.s";
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
index d35205f274..32282d19ab 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBAutoRegionGroupExtensionIT.java
@@ -48,7 +48,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -113,8 +112,7 @@ public class IoTDBAutoRegionGroupExtensionIT {
}
@Test
- public void testAutoRegionGroupExtensionPolicy()
- throws IOException, InterruptedException, TException {
+ public void testAutoRegionGroupExtensionPolicy() throws Exception {
final int retryNum = 100;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
index effb799fbc..0d22f3982c 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBCustomRegionGroupExtensionIT.java
@@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionTableResp;
@@ -41,7 +40,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.env.BaseConfig;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -49,7 +47,6 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -131,8 +128,7 @@ public class IoTDBCustomRegionGroupExtensionIT {
}
@Test
- public void testCustomRegionGroupExtensionPolicy()
- throws IOException, InterruptedException, TException, IllegalPathException {
+ public void testCustomRegionGroupExtensionPolicy() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
index c9de411582..665155e8ae 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionDurableIT.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionStatus;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -50,7 +49,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -60,7 +58,6 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -129,7 +126,7 @@ public class IoTDBPartitionDurableIT {
setStorageGroup();
}
- private void setStorageGroup() throws IOException, InterruptedException, TException {
+ private void setStorageGroup() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
TSetStorageGroupReq setStorageGroupReq = new TSetStorageGroupReq(new TStorageGroupSchema(sg));
@@ -156,8 +153,7 @@ public class IoTDBPartitionDurableIT {
}
@Test
- public void testRemovingDataNode()
- throws IOException, InterruptedException, TException, IllegalPathException {
+ public void testRemovingDataNode() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -283,7 +279,7 @@ public class IoTDBPartitionDurableIT {
}
@Test
- public void testReadOnlyDataNode() throws IOException, InterruptedException, TException {
+ public void testReadOnlyDataNode() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
@@ -434,7 +430,7 @@ public class IoTDBPartitionDurableIT {
}
@Test
- public void testUnknownDataNode() throws IOException, TException, InterruptedException {
+ public void testUnknownDataNode() throws Exception {
// Shutdown a DataNode, the ConfigNode should still be able to create RegionGroup
EnvFactory.getEnv().shutdownDataNode(testDataNodeId);
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
index be66888bd9..edfad728f9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionGetterIT.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
-import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TDataPartitionReq;
@@ -54,7 +53,6 @@ import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.env.BaseConfig;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -64,7 +62,6 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -137,8 +134,7 @@ public class IoTDBPartitionGetterIT {
prepareData();
}
- private static void prepareData()
- throws IOException, InterruptedException, TException, IllegalPathException {
+ private static void prepareData() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
/* Set StorageGroups */
@@ -242,8 +238,7 @@ public class IoTDBPartitionGetterIT {
}
@Test
- public void testGetSchemaPartition()
- throws TException, IOException, IllegalPathException, InterruptedException {
+ public void testGetSchemaPartition() throws Exception {
final String sg = "root.sg";
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -306,7 +301,7 @@ public class IoTDBPartitionGetterIT {
}
@Test
- public void testGetDataPartition() throws TException, IOException, InterruptedException {
+ public void testGetDataPartition() throws Exception {
final int seriesPartitionBatchSize = 100;
final int timePartitionBatchSize = 10;
@@ -384,8 +379,7 @@ public class IoTDBPartitionGetterIT {
}
@Test
- public void testGetSlots()
- throws TException, IOException, IllegalPathException, InterruptedException {
+ public void testGetSlots() throws Exception {
final String sg0 = "root.sg0";
final String sg1 = "root.sg1";
@@ -532,8 +526,7 @@ public class IoTDBPartitionGetterIT {
}
@Test
- public void testGetSchemaNodeManagementPartition()
- throws IOException, TException, IllegalPathException, InterruptedException {
+ public void testGetSchemaNodeManagementPartition() throws Exception {
TSchemaNodeManagementReq nodeManagementReq;
TSchemaNodeManagementResp nodeManagementResp;
diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
index 6e8ae39c83..f9dd8af7e9 100644
--- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/IoTDBPartitionInheritPolicyIT.java
@@ -36,7 +36,6 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -46,7 +45,6 @@ import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -127,8 +125,7 @@ public class IoTDBPartitionInheritPolicyIT {
}
@Test
- public void testDataPartitionInheritPolicy()
- throws TException, IOException, InterruptedException {
+ public void testDataPartitionInheritPolicy() throws Exception {
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
index 4054cbc4fe..363decbf2d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientFactoryProperty.java
@@ -51,11 +51,12 @@ public class ClientFactoryProperty {
}
public static class Builder {
- // whether to use thrift compression
+
+ /** whether to use thrift compression. */
private boolean rpcThriftCompressionEnabled = DefaultProperty.RPC_THRIFT_COMPRESSED_ENABLED;
- // socket timeout for thrift client
+ /** socket timeout for thrift client. */
private int connectionTimeoutMs = DefaultProperty.CONNECTION_TIMEOUT_MS;
- // number of selector threads for asynchronous thrift client in a clientManager
+ /** number of selector threads for asynchronous thrift client in a clientManager. */
private int selectorNumOfAsyncClientManager =
DefaultProperty.SELECTOR_NUM_OF_ASYNC_CLIENT_MANAGER;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index ba094a54ee..66836967de 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -19,14 +19,14 @@
package org.apache.iotdb.commons.client;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.util.Optional;
public class ClientManager<K, V> implements IClientManager<K, V> {
@@ -44,60 +44,44 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
}
@Override
- public V borrowClient(K node) throws IOException {
- V client;
+ public V borrowClient(K node) throws ClientManagerException {
try {
- client = pool.borrowObject(node);
- } catch (TTransportException e) {
- // External needs to check transport related exception
- throw new IOException(e);
- } catch (IOException e) {
- // External needs the IOException to check connection
- throw e;
+ return pool.borrowObject(node);
} catch (Exception e) {
- // External doesn't care of other exceptions
- String errorMessage =
- String.format(
- "Borrow client from pool for node %s failed, you need to increase dn_max_connection_for_internal_service.",
- node);
- logger.warn(errorMessage, e);
- throw new IOException(errorMessage, e);
+ throw new ClientManagerException(e);
}
- return client;
}
- @Override
- public V purelyBorrowClient(K node) {
- V client = null;
- try {
- client = pool.borrowObject(node);
- } catch (Exception ignored) {
- // Just ignore
- }
- return client;
- }
-
- // return a V client of the K node to the Manager
+ /**
+ * return a client V for node K to the ClientManager Note: We do not define this interface in
+ * IClientManager to make you aware that the return of a client is automatic whenever a particular
+ * client is used.
+ */
public void returnClient(K node, V client) {
- if (client != null && node != null) {
- try {
- pool.returnObject(node, client);
- } catch (Exception e) {
- logger.error(
- String.format("Return client %s for node %s to pool failed.", client, node), e);
- }
- }
+ Optional.ofNullable(node)
+ .ifPresent(
+ x -> {
+ try {
+ pool.returnObject(node, client);
+ } catch (Exception e) {
+ logger.error(
+ String.format("Return client %s for node %s to pool failed.", client, node), e);
+ }
+ });
}
@Override
public void clear(K node) {
- if (node != null) {
- try {
- pool.clear(node);
- } catch (Exception e) {
- logger.error(String.format("Clear all client in pool for node %s failed.", node), e);
- }
- }
+ Optional.ofNullable(node)
+ .ifPresent(
+ x -> {
+ try {
+ pool.clear(node);
+ } catch (Exception e) {
+ logger.error(
+ String.format("Clear all client in pool for node %s failed.", node), e);
+ }
+ });
}
@Override
@@ -105,7 +89,7 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
try {
pool.close();
} catch (Exception e) {
- logger.error("close client pool failed", e);
+ logger.error("Close client pool failed", e);
}
}
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
index c764692564..45870e9ba6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolProperty.java
@@ -37,14 +37,20 @@ public class ClientPoolProperty<V> {
}
public static class Builder<V> {
- // when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the
- // current thread will block waitClientTimeoutMS, ClientManager returns NULL if there are no
- // clients after the block time
+
+ /**
+ * when the number of the client to a single node exceeds maxTotalConnectionForEachNode, the
+ * current thread will block waitClientTimeoutMS, ClientManager throws ClientManagerException if
+ * there are no clients after the block time.
+ */
private long waitClientTimeoutMS = DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
- // the maximum number of clients that can be applied for a node
+
+ /** the maximum number of clients that can be allocated for a node. */
private int maxTotalClientForEachNode = DefaultProperty.MAX_TOTAL_CLIENT_FOR_EACH_NODE;
- // the maximum number of clients that can be idle for a node. When the number of idle clients on
- // a node exceeds this number, newly returned clients will be released
+ /**
+ * the maximum number of clients that can be idle for a node. When the number of idle clients on
+ * a node exceeds this number, newly returned clients will be released.
+ */
private int maxIdleClientForEachNode = DefaultProperty.MAX_IDLE_CLIENT_FOR_EACH_NODE;
public Builder<V> setWaitClientTimeoutMS(long waitClientTimeoutMS) {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 4dfaf6b362..2b74fc716f 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -19,28 +19,24 @@
package org.apache.iotdb.commons.client;
-import javax.annotation.concurrent.ThreadSafe;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
-import java.io.IOException;
+import javax.annotation.concurrent.ThreadSafe;
@ThreadSafe
public interface IClientManager<K, V> {
- // get a V client of the K node from the Manager
- V borrowClient(K node) throws IOException;
+ /** get a client V for node K from the IClientManager. */
+ V borrowClient(K node) throws ClientManagerException;
- // Get a V client of the K node from the Manager while
- // no exceptions will be thrown and no logs will be printed.
- // This interface is mainly used to process the cluster heartbeat.
- V purelyBorrowClient(K node);
-
- // clear all clients for K node
+ /** clear all clients for node K. */
void clear(K node);
- // close clientManager
+ /** close IClientManager, which means closing all clients for all nodes. */
void close();
class Factory<K, V> {
+
public IClientManager<K, V> createClientManager(IClientPoolFactory<K, V> clientPoolFactory) {
return new ClientManager<>(clientPoolFactory);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
index 7e3e95a019..5f2c7a72f6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.commons.client;
import org.apache.commons.pool2.KeyedObjectPool;
public interface IClientPoolFactory<K, V> {
- // We can implement this interface in other modules and then set the corresponding expected
- // parameters and client factory classes
+
+ /**
+ * We can implement this interface in other modules and then set the corresponding expected
+ * parameters and client factory classes.
+ */
KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
similarity index 70%
copy from node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
copy to node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
index 7e3e95a019..439f25b655 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientPoolFactory.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/exception/ClientManagerException.java
@@ -17,12 +17,10 @@
* under the License.
*/
-package org.apache.iotdb.commons.client;
+package org.apache.iotdb.commons.client.exception;
-import org.apache.commons.pool2.KeyedObjectPool;
-
-public interface IClientPoolFactory<K, V> {
- // We can implement this interface in other modules and then set the corresponding expected
- // parameters and client factory classes
- KeyedObjectPool<K, V> createClientPool(ClientManager<K, V> manager);
+public class ClientManagerException extends Exception {
+ public ClientManagerException(Exception exception) {
+ super(exception);
+ }
}
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
index 5ce1746799..f7c7f05e24 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ThriftServiceThread.java
@@ -31,7 +31,7 @@ public class ThriftServiceThread extends AbstractThriftServiceThread {
/** for asynced ThriftService. */
@SuppressWarnings("squid:S107")
public ThriftServiceThread(
- TBaseAsyncProcessor processor,
+ TBaseAsyncProcessor<?> processor,
String serviceName,
String threadsName,
String bindAddress,
diff --git a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
index 05169af548..18bf4bcd38 100644
--- a/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
+++ b/node-commons/src/test/java/org/apache/iotdb/commons/client/ClientManagerTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.commons.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.mock.MockInternalRPCService;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.exception.StartupException;
@@ -34,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.mock;
@@ -251,14 +253,15 @@ public class ClientManagerTest {
// get another sync client, should wait waitClientTimeoutMS ms, throw error
SyncDataNodeInternalServiceClient syncClient2 = null;
- long start = 0, end;
+ long start = 0;
try {
start = System.nanoTime();
syncClient2 = syncClusterManager.borrowClient(endPoint);
- } catch (IOException e) {
- end = System.nanoTime();
+ } catch (ClientManagerException e) {
+ long end = System.nanoTime();
Assert.assertTrue(end - start >= waitClientTimeoutMs * 1_000_000);
- Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node"));
+ Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
+ Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object"));
}
Assert.assertNull(syncClient2);
@@ -322,14 +325,15 @@ public class ClientManagerTest {
Assert.assertEquals(0, syncClusterManager.getPool().getNumIdle(endPoint));
// get another sync client, should wait waitClientTimeoutMS ms, throw error
- long start = 0, end;
+ long start = 0;
try {
start = System.nanoTime();
- syncClusterManager.borrowClient(endPoint);
- } catch (IOException e) {
- end = System.nanoTime();
+ syncClient1 = syncClusterManager.borrowClient(endPoint);
+ } catch (ClientManagerException e) {
+ long end = System.nanoTime();
Assert.assertTrue(end - start >= waitClientTimeoutMS * 1_000_000);
- Assert.assertTrue(e.getMessage().startsWith("Borrow client from pool for node"));
+ Assert.assertTrue(e.getCause() instanceof NoSuchElementException);
+ Assert.assertTrue(e.getMessage().contains("Timeout waiting for idle object"));
}
// return one sync client
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
index 7c5b33ff16..56acc74894 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/ClusterAuthorityFetcher.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.auth.entity.PathPrivilege;
import org.apache.iotdb.commons.auth.entity.Role;
import org.apache.iotdb.commons.auth.entity.User;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.path.PartialPath;
@@ -50,7 +51,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -139,7 +139,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.error("Failed to connect to config node.");
future.setException(e);
} catch (AuthException e) {
@@ -173,7 +173,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
} else {
AuthorizerManager.getInstance().buildTSBlock(authorizerResp.getAuthorizerInfo(), future);
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.error("Failed to connect to config node.");
authorizerResp.setStatus(
RpcUtils.getStatus(
@@ -209,7 +209,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
status = configNodeClient.login(req);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.error("Failed to connect to config node.");
status = new TPermissionInfoResp();
status.setStatus(
@@ -236,7 +236,7 @@ public class ClusterAuthorityFetcher implements IAuthorityFetcher {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
permissionInfoResp = configNodeClient.checkUserPrivileges(req);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.error("Failed to connect to config node.");
permissionInfoResp = new TPermissionInfoResp();
permissionInfoResp.setStatus(
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
index d6dd27d65c..0e21320667 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/ClusterTemplateManager.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.template;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -41,7 +42,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -98,7 +98,7 @@ public class ClusterTemplateManager implements ITemplateManager {
tsStatus);
}
return tsStatus;
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new RuntimeException(
new IoTDBException(
"create template error.", e, TSStatusCode.CREATE_TEMPLATE_ERROR.getStatusCode()));
@@ -147,7 +147,7 @@ public class ClusterTemplateManager implements ITemplateManager {
tGetAllTemplatesResp.getStatus().getMessage(),
tGetAllTemplatesResp.getStatus().getCode()));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new RuntimeException(
new IoTDBException(
"get all template error.", TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode()));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
index 943d24a6e5..50a3f2e1e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandle.java
@@ -402,7 +402,7 @@ public class SinkHandle implements ISinkHandle {
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
- } catch (Throwable e) {
+ } catch (Exception e) {
logger.warn("Failed to send new data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
sinkHandleListener.onFailure(SinkHandle.this, e);
@@ -442,7 +442,7 @@ public class SinkHandle implements ISinkHandle {
mppDataExchangeServiceClientManager.borrowClient(remoteEndpoint)) {
client.onEndOfDataBlockEvent(endOfDataBlockEvent);
break;
- } catch (Throwable e) {
+ } catch (Exception e) {
logger.warn("Failed to send end of data block event, attempt times: {}", attempt, e);
if (attempt == MAX_ATTEMPT_TIMES) {
logger.warn("Failed to send end of data block event after all retry", e);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
index 72bced5f55..b9a358c2c3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/TestRPCClient.java
@@ -37,9 +37,6 @@ import org.apache.iotdb.db.client.DataNodeClientPoolFactory;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
-import org.apache.thrift.TException;
-
-import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -75,7 +72,7 @@ public class TestRPCClient {
new TTriggerSnapshotLoadReq(
new DataRegionId(1).convertToTConsensusGroupId(), "snapshot_1_1662370255552"));
System.out.println(res.status);
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -87,7 +84,7 @@ public class TestRPCClient {
client.inactivatePeer(
new TInactivatePeerReq(new DataRegionId(1).convertToTConsensusGroupId()));
System.out.println(res.status);
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -97,7 +94,7 @@ public class TestRPCClient {
INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
client.removeRegionPeer(
new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -107,7 +104,7 @@ public class TestRPCClient {
INTERNAL_SERVICE_CLIENT_MANAGER.borrowClient(new TEndPoint("127.0.0.1", 9003))) {
client.addRegionPeer(
new TMaintainPeerReq(new DataRegionId(1).convertToTConsensusGroupId(), getLocation2(3)));
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
@@ -169,7 +166,7 @@ public class TestRPCClient {
TSStatus res = client.createDataRegion(req);
System.out.println(res.code + " " + res.message);
- } catch (IOException | TException e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
index 13213da4fa..64bdf7e0d2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/AnalyzeVisitor.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.analyze;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -2525,7 +2526,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
+ showDataNodesResp.getStatus().getMessage());
}
return showDataNodesResp.getDataNodeLocationList();
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getRunningDataNodeLocations():" + e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
index 42e379beb9..2c70aaf3d0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/ClusterPartitionFetcher.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
@@ -62,6 +63,7 @@ import java.util.List;
import java.util.Map;
public class ClusterPartitionFetcher implements IPartitionFetcher {
+
private static final Logger logger = LoggerFactory.getLogger(ClusterPartitionFetcher.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -74,6 +76,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
.createClientManager(new DataNodeClientPoolFactory.ConfigNodeClientPoolFactory());
private static final class ClusterPartitionFetcherHolder {
+
private static final ClusterPartitionFetcher INSTANCE = new ClusterPartitionFetcher();
private ClusterPartitionFetcherHolder() {}
@@ -115,7 +118,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
}
return schemaPartition;
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
logger.warn("Get Schema Partition error", e);
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaPartition():" + e.getMessage());
@@ -147,7 +150,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
}
}
return schemaPartition;
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateSchemaPartition():" + e.getMessage());
}
@@ -164,7 +167,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
constructSchemaNodeManagementPartitionReq(patternTree, level));
return parseSchemaNodeManagementPartitionResp(schemaNodeManagementResp);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getSchemaNodeManagementPartition():" + e.getMessage());
}
@@ -188,7 +191,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
"An error occurred when executing getDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
}
@@ -214,7 +217,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
"An error occurred when executing getDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDataPartition():" + e.getMessage());
}
@@ -239,7 +242,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
"An error occurred when executing getOrCreateDataPartition():"
+ dataPartitionTableResp.getStatus().getMessage());
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
@@ -272,7 +275,7 @@ public class ClusterPartitionFetcher implements IPartitionFetcher {
dataPartitionTableResp.getStatus().getMessage(),
dataPartitionTableResp.getStatus().getCode()));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getOrCreateDataPartition():" + e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
index 3b5b25967f..040c568b40 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/analyze/cache/PartitionCache.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
@@ -56,7 +57,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -184,7 +184,8 @@ public class PartitionCache {
* @param devicePaths the devices that need to hit
*/
private void fetchStorageGroupAndUpdateCache(
- StorageGroupCacheResult<?> result, List<String> devicePaths) throws IOException, TException {
+ StorageGroupCacheResult<?> result, List<String> devicePaths)
+ throws ClientManagerException, TException {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
storageGroupCacheLock.writeLock().lock();
@@ -215,7 +216,7 @@ public class PartitionCache {
*/
private void createStorageGroupAndUpdateCache(
StorageGroupCacheResult<?> result, List<String> devicePaths)
- throws IOException, MetadataException, TException {
+ throws ClientManagerException, MetadataException, TException {
try (ConfigNodeClient client =
configNodeClientManager.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
storageGroupCacheLock.writeLock().lock();
@@ -336,7 +337,7 @@ public class PartitionCache {
throw new StatementAnalyzeException("Failed to get database Map in three attempts.");
}
}
- } catch (TException | MetadataException | IOException e) {
+ } catch (TException | MetadataException | ClientManagerException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getDeviceToStorageGroup():" + e.getMessage());
}
@@ -421,7 +422,7 @@ public class PartitionCache {
throw new RuntimeException(
"Failed to get replicaSet of consensus group[id= " + consensusGroupId + "]");
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
throw new StatementAnalyzeException(
"An error occurred when executing getRegionReplicaSet():" + e.getMessage());
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 953550f7a6..ced8aa4dfd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.exception.IoTDBException;
@@ -216,7 +217,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -235,7 +236,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
TShowStorageGroupResp resp = client.showStorageGroup(storageGroupPathPattern);
// build TSBlock
ShowStorageGroupTask.buildTSBlock(resp.getStorageGroupInfoMap(), future);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -254,7 +255,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
storageGroupNum = resp.getCount();
// build TSBlock
CountStorageGroupTask.buildTSBlock(storageGroupNum, future);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -278,7 +279,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -400,7 +401,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | IOException | TException e) {
future.setException(e);
}
return future;
@@ -419,7 +420,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -439,7 +440,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
}
// convert triggerTable and buildTsBlock
ShowFunctionsTask.buildTsBlock(getUDFTableResp.getAllUDFInformation(), future);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
@@ -575,7 +576,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException | IOException e) {
future.setException(e);
}
return future;
@@ -593,7 +594,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -614,7 +615,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
}
// convert triggerTable and buildTsBlock
ShowTriggersTask.buildTsBlock(getTriggerTableResp.getAllTriggerInformation(), future);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
@@ -642,7 +643,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -657,7 +658,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
tsStatus = client.merge();
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
@@ -680,7 +681,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
tsStatus = client.flush(tFlushReq);
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
@@ -703,7 +704,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
tsStatus = client.clearCache();
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
@@ -726,7 +727,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
tsStatus = client.loadConfiguration();
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
@@ -749,7 +750,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
// Send request to some API server
tsStatus = client.setSystemStatus(status.getStatus());
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
} else {
@@ -770,7 +771,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
try (ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.configNodeRegionId)) {
showClusterResp = client.showCluster();
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
if (showClusterResp.getConfigNodeList() == null) {
future.setException(new TException(MSG_RECONNECTION_FAIL));
} else {
@@ -816,7 +817,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
}
}
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
@@ -847,7 +848,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
showRegionResp.getStatus().message, showRegionResp.getStatus().code));
return future;
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
@@ -869,7 +870,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
showDataNodesResp.getStatus().message, showDataNodesResp.getStatus().code));
return future;
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
@@ -891,7 +892,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
showConfigNodesResp.getStatus().message, showConfigNodesResp.getStatus().code));
return future;
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
// build TSBlock
@@ -1027,7 +1028,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1052,7 +1053,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1111,7 +1112,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1326,7 +1327,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1475,7 +1476,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1493,7 +1494,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
return future;
@@ -1512,7 +1513,7 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
}
// convert cqList and buildTsBlock
ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future);
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
future.setException(e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
index e71f331cdf..bebb38f2b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/AbstractFragInsStateTracker.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.QueryStateMachine;
@@ -35,7 +36,6 @@ import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceInfoResp;
import org.apache.thrift.TException;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -70,7 +70,7 @@ public abstract class AbstractFragInsStateTracker implements IFragInstanceStateT
public abstract void abort();
protected FragmentInstanceInfo fetchInstanceInfo(FragmentInstance instance)
- throws TException, IOException {
+ throws ClientManagerException, TException {
TEndPoint endPoint = instance.getHostDataNode().internalEndPoint;
if (isInstanceRunningLocally(endPoint)) {
FragmentInstanceInfo info =
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
index 613e0011ba..80b90b9a21 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FixedRateFragInsStateTracker.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -35,7 +36,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -131,7 +131,7 @@ public class FixedRateFragInsStateTracker extends AbstractFragInsStateTracker {
updateQueryState(instance.getId(), instanceInfo);
}
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
// TODO: do nothing ?
logger.warn("error happened while fetching query state", e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 1ba07ccb79..db04e97f0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -47,7 +48,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -193,7 +193,7 @@ public class FragmentInstanceDispatcherImpl implements IFragInstanceDispatcher {
TSStatusCode.EXECUTE_STATEMENT_ERROR,
String.format("unknown query type [%s]", instance.getType())));
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
logger.warn("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
index bda8205edf..14bace8dc2 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/SimpleQueryTerminator.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.plan.scheduler;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
@@ -28,10 +29,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.mpp.rpc.thrift.TCancelQueryReq;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -99,11 +100,11 @@ public class SimpleQueryTerminator implements IQueryTerminator {
try (SyncDataNodeInternalServiceClient client =
internalServiceClientManager.borrowClient(endPoint)) {
client.cancelQuery(new TCancelQueryReq(queryId.getId(), unfinishedFIs));
- } catch (IOException e) {
+ } catch (ClientManagerException e) {
logger.warn("can't connect to node {}", endPoint, e);
// we shouldn't return here and need to cancel queryTasks in other nodes
succeed = false;
- } catch (Throwable t) {
+ } catch (TException t) {
logger.warn("cancel query {} on node {} failed.", queryId.getId(), endPoint, t);
// we shouldn't return here and need to cancel queryTasks in other nodes
succeed = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 956f7c2ae9..80d9ec3516 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -50,7 +51,6 @@ import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -135,7 +135,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
logger.warn(loadResp.message);
throw new FragmentInstanceDispatchException(loadResp.status);
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
logger.warn("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
@@ -222,7 +222,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher {
logger.warn(loadResp.message);
throw new FragmentInstanceDispatchException(loadResp.status);
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
logger.warn("can't connect to node {}", endPoint, e);
TSStatus status = new TSStatus();
status.setCode(TSStatusCode.SYNC_CONNECTION_ERROR.getStatusCode());
diff --git a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
index fc1a7d056b..8d5eb5cdcf 100644
--- a/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/trigger/executor/TriggerFireVisitor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.trigger.executor;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.commons.path.PartialPath;
@@ -340,7 +341,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
Thread.sleep(4000);
}
}
- } catch (IOException | TException e) {
+ } catch (ClientManagerException | TException e) {
// IOException means that we failed to borrow client, possibly because corresponding
// DataNode is down.
// TException means there's a timeout or broken connection.
@@ -353,7 +354,10 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
e);
// update TDataNodeLocation of stateful trigger through config node
updateLocationOfStatefulTrigger(triggerName, tDataNodeLocation.getDataNodeId());
- } catch (Throwable e) {
+ } catch (InterruptedException e) {
+ LOGGER.warn("{} interrupted when sleep", triggerName);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
LOGGER.warn(
"Error occurred when trying to fire trigger({}) on TEndPoint: {}, the cause is: {}",
triggerName,
@@ -412,7 +416,7 @@ public class TriggerFireVisitor extends PlanVisitor<TriggerFireResult, TriggerEv
}
}
return false;
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException | IOException e) {
LOGGER.error(
"Failed to update location of stateful trigger({}) through config node and the cause is {}.",
triggerName,
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
index b11203dd11..0c8ebf9f73 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SinkHandleTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SinkHandleListener;
@@ -74,7 +75,7 @@ public class SinkHandleTest {
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -223,7 +224,7 @@ public class SinkHandleTest {
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -423,7 +424,7 @@ public class SinkHandleTest {
Mockito.doThrow(mockException)
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -532,7 +533,7 @@ public class SinkHandleTest {
Mockito.doNothing()
.when(mockClient)
.onNewDataBlockEvent(Mockito.any(TNewDataBlockEvent.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
index 7c345b5d9f..aa9adbace6 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/exchange/SourceHandleTest.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.mpp.execution.exchange;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncDataNodeMPPDataExchangeServiceClient;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeManager.SourceHandleListener;
@@ -37,7 +38,6 @@ import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
-import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -81,7 +81,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -197,7 +197,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -351,7 +351,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -523,7 +523,7 @@ public class SourceHandleTest {
Mockito.doThrow(mockException)
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}
@@ -606,7 +606,7 @@ public class SourceHandleTest {
})
.when(mockClient)
.getDataBlock(Mockito.any(TGetDataBlockRequest.class));
- } catch (TException | IOException e) {
+ } catch (ClientManagerException | TException e) {
e.printStackTrace();
Assert.fail();
}