You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ca...@apache.org on 2023/01/28 12:35:46 UTC
[iotdb] branch master updated: [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)
This is an automated email from the ASF dual-hosted git repository.
caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ca84f2f904 [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)
ca84f2f904 is described below
commit ca84f2f9044432e072723ddba28c94f9bef4efc5
Author: Potato <ta...@apache.org>
AuthorDate: Sat Jan 28 20:35:39 2023 +0800
[IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)
---
.../apache/iotdb/commons/client/ClientManager.java | 13 ++++-----
.../apache/iotdb/db/client/ConfigNodeClient.java | 34 +++++++---------------
.../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++-------
.../db/service/DataNodeServerCommandLine.java | 17 +++++++----
.../iotdb/db/service/RegionMigrateService.java | 14 +++++++--
5 files changed, 56 insertions(+), 50 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 770c729bbe..3ee338d4ec 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -57,9 +57,10 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
}
/**
- * return a client V for node K to the ClientManager Note: We do not define this interface in
- * IClientManager to make you aware that the return of a client is automatic whenever a particular
- * client is used.
+ * return a client V for node K to the ClientManager.
+ *
+ * <p>Note: We do not define this interface in IClientManager to make you aware that the return of
+ * a client is automatic whenever a particular client is used.
*/
public void returnClient(K node, V client) {
Optional.ofNullable(node)
@@ -89,10 +90,6 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
@Override
public void close() {
- try {
- pool.close();
- } catch (Exception e) {
- logger.warn("Close client pool failed", e);
- }
+ pool.close();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 2d7b8a26ac..48133e9981 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -27,10 +27,8 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.ThriftClient;
import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ClientPoolProperty;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
@@ -127,8 +125,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -150,7 +146,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
private static final int RETRY_INTERVAL_MS = 1000;
- private long connectionTimeout = ClientPoolProperty.DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
+ private final long connectionTimeout;
private IConfigNodeRPCService.Iface client;
@@ -172,23 +168,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
TProtocolFactory protocolFactory;
- public ConfigNodeClient() throws TException {
- // Read config nodes from configuration
- configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
- protocolFactory =
- CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
- ? new TCompactProtocol.Factory()
- : new TBinaryProtocol.Factory();
-
- init();
- }
-
public ConfigNodeClient(
+ List<TEndPoint> configNodes,
TProtocolFactory protocolFactory,
long connectionTimeout,
ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager)
throws TException {
- configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
+ this.configNodes = configNodes;
this.protocolFactory = protocolFactory;
this.connectionTimeout = connectionTimeout;
this.clientManager = clientManager;
@@ -283,11 +269,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
@Override
public void close() {
- if (clientManager != null) {
- clientManager.returnClient(configNodeRegionId, this);
- } else {
- invalidate();
- }
+ clientManager.returnClient(configNodeRegionId, this);
}
@Override
@@ -1941,6 +1923,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
}
public static class Factory extends ThriftClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
public Factory(
ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
ThriftClientProperty thriftClientProperty) {
@@ -1960,7 +1943,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
SyncThriftClientWithErrorHandler.newErrorHandler(
ConfigNodeClient.class,
ConfigNodeClient.class.getConstructor(
- TProtocolFactory.class, long.class, clientManager.getClass()),
+ List.class, TProtocolFactory.class, long.class, clientManager.getClass()),
+ ConfigNodeInfo.getInstance().getLatestConfigNodes(),
thriftClientProperty.getProtocolFactory(),
thriftClientProperty.getConnectionTimeoutMs(),
clientManager));
@@ -1969,7 +1953,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
@Override
public boolean validateObject(
ConfigNodeRegionId configNodeRegionId, PooledObject<ConfigNodeClient> pooledObject) {
- return pooledObject.getObject() != null && pooledObject.getObject().getTransport().isOpen();
+ return Optional.ofNullable(pooledObject.getObject().getTransport())
+ .map(TTransport::isOpen)
+ .orElse(false);
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index e3f50f7639..29efb97a7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -51,6 +52,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -98,6 +100,7 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
public class DataNode implements DataNodeMBean {
+
private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -229,10 +232,11 @@ public class DataNode implements DataNodeMBean {
int retry = DEFAULT_RETRY;
TSystemConfigurationResp configurationResp = null;
while (retry > 0) {
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
configurationResp = configNodeClient.getSystemConfiguration();
break;
- } catch (TException e) {
+ } catch (TException | ClientManagerException e) {
// Read ConfigNodes from system.properties and retry
logger.warn(
"Cannot pull system configurations from ConfigNode-leader, because: {}",
@@ -341,10 +345,11 @@ public class DataNode implements DataNodeMBean {
req.setClusterName(config.getClusterName());
TDataNodeRegisterResp dataNodeRegisterResp = null;
while (retry > 0) {
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
dataNodeRegisterResp = configNodeClient.registerDataNode(req);
break;
- } catch (TException e) {
+ } catch (TException | ClientManagerException e) {
// Read ConfigNodes from system.properties and retry
logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
ConfigNodeInfo.getInstance().loadConfigNodeList();
@@ -398,10 +403,11 @@ public class DataNode implements DataNodeMBean {
req.setDataNodeConfiguration(generateDataNodeConfiguration());
TDataNodeRestartResp dataNodeRestartResp = null;
while (retry > 0) {
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
dataNodeRestartResp = configNodeClient.restartDataNode(req);
break;
- } catch (TException e) {
+ } catch (TException | ClientManagerException e) {
// Read ConfigNodes from system.properties and retry
logger.warn(
"Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
@@ -643,7 +649,8 @@ public class DataNode implements DataNodeMBean {
}
private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
List<String> jarNameList =
udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
@@ -655,7 +662,7 @@ public class DataNode implements DataNodeMBean {
UDFExecutableManager.getInstance()
.saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
}
- } catch (IOException | TException e) {
+ } catch (IOException | TException | ClientManagerException e) {
throw new StartupException(e);
}
}
@@ -752,7 +759,8 @@ public class DataNode implements DataNodeMBean {
private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
throws StartupException {
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
List<String> jarNameList =
triggerInformationList.stream()
.map(TriggerInformation::getJarName)
@@ -766,7 +774,7 @@ public class DataNode implements DataNodeMBean {
TriggerExecutableManager.getInstance()
.saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
}
- } catch (IOException | TException e) {
+ } catch (IOException | TException | ClientManagerException e) {
throw new StartupException(e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 2b254b2296..a1a6d1621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -22,12 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.BadNodeUrlException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -91,7 +93,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
* @param args id or ip:rpc_port for removed datanode
*/
private void doRemoveDataNode(String[] args)
- throws BadNodeUrlException, TException, IoTDBException {
+ throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
if (args.length != 2) {
LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
@@ -109,7 +111,8 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
}
LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
- try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+ try (ConfigNodeClient configNodeClient =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
LOGGER.info("Remove result {} ", removeResp);
if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -145,17 +148,19 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
// Below supports multiple datanode deletion, split by ',', and is reserved for extension
try {
List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
- try (ConfigNodeClient client = new ConfigNodeClient()) {
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
dataNodeLocations =
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
.map(TDataNodeConfiguration::getLocation)
.filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
.collect(Collectors.toList());
- } catch (TException e) {
+ } catch (TException | ClientManagerException e) {
LOGGER.error("Get data node locations failed", e);
}
} catch (BadNodeUrlException e) {
- try (ConfigNodeClient client = new ConfigNodeClient()) {
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
for (String id : args.split(",")) {
if (!isNumeric(id)) {
LOGGER.warn("Incorrect id format {}, skipped...", id);
@@ -174,7 +179,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
dataNodeLocations.add(nodeLocationResult.get(0));
}
}
- } catch (TException e1) {
+ } catch (TException | ClientManagerException e1) {
LOGGER.error("Get data node locations failed", e);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 327efc3787..b027be04ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.engine.StorageEngine;
@@ -50,6 +53,7 @@ import java.util.HashMap;
import java.util.Map;
public class RegionMigrateService implements IService {
+
private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
@@ -153,6 +157,7 @@ public class RegionMigrateService implements IService {
}
private static class RegionMigratePool extends AbstractPoolManager {
+
private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
private RegionMigratePool() {
@@ -178,6 +183,7 @@ public class RegionMigrateService implements IService {
}
private static class AddRegionPeerTask implements Runnable {
+
private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
// The RegionGroup that shall perform the add peer process
@@ -266,6 +272,7 @@ public class RegionMigrateService implements IService {
}
private static class RemoveRegionPeerTask implements Runnable {
+
private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
private final TConsensusGroupId tRegionId;
@@ -354,6 +361,7 @@ public class RegionMigrateService implements IService {
}
private static class DeleteOldRegionPeerTask implements Runnable {
+
private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
private final TConsensusGroupId tRegionId;
@@ -453,6 +461,7 @@ public class RegionMigrateService implements IService {
}
private static class Holder {
+
private static final RegionMigrateService INSTANCE = new RegionMigrateService();
private Holder() {}
@@ -497,9 +506,10 @@ public class RegionMigrateService implements IService {
}
private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
- throws TException {
+ throws TException, ClientManagerException {
TSStatus status;
- try (ConfigNodeClient client = new ConfigNodeClient()) {
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
status = client.reportRegionMigrateResult(req);
LOGGER.info(
"{}, Report region {} migrate result {} to Config node succeed, result: {}",