You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/30 03:57:58 UTC

[iotdb] branch rel/1.0 updated: [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891) (#8911)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.0
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.0 by this push:
     new c395ef5992 [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891) (#8911)
c395ef5992 is described below

commit c395ef599297ba85c01f00aaf89236fad4feb30c
Author: Potato <ta...@apache.org>
AuthorDate: Mon Jan 30 11:57:52 2023 +0800

    [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891) (#8911)
---
 .../apache/iotdb/commons/client/ClientManager.java | 13 ++++-----
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 33 ++++++----------------
 .../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++-------
 .../db/service/DataNodeServerCommandLine.java      | 17 +++++++----
 .../iotdb/db/service/RegionMigrateService.java     | 14 +++++++--
 5 files changed, 55 insertions(+), 50 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 770c729bbe..3ee338d4ec 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -57,9 +57,10 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
   }
 
   /**
-   * return a client V for node K to the ClientManager Note: We do not define this interface in
-   * IClientManager to make you aware that the return of a client is automatic whenever a particular
-   * client is used.
+   * return a client V for node K to the ClientManager.
+   *
+   * <p>Note: We do not define this interface in IClientManager to make you aware that the return of
+   * a client is automatic whenever a particular client is used.
    */
   public void returnClient(K node, V client) {
     Optional.ofNullable(node)
@@ -89,10 +90,6 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
 
   @Override
   public void close() {
-    try {
-      pool.close();
-    } catch (Exception e) {
-      logger.warn("Close client pool failed", e);
-    }
+    pool.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 702818e2ef..ffd25ad9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -27,10 +27,8 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
@@ -118,8 +116,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -141,7 +137,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   private static final int RETRY_INTERVAL_MS = 1000;
 
-  private long connectionTimeout = ClientPoolProperty.DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
+  private final long connectionTimeout;
 
   private IConfigNodeRPCService.Iface client;
 
@@ -163,23 +159,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   TProtocolFactory protocolFactory;
 
-  public ConfigNodeClient() throws TException {
-    // Read config nodes from configuration
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
-    protocolFactory =
-        CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
-            ? new TCompactProtocol.Factory()
-            : new TBinaryProtocol.Factory();
-
-    init();
-  }
-
   public ConfigNodeClient(
+      List<TEndPoint> configNodes,
       TProtocolFactory protocolFactory,
       long connectionTimeout,
       ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager)
       throws TException {
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
+    this.configNodes = configNodes;
     this.protocolFactory = protocolFactory;
     this.connectionTimeout = connectionTimeout;
     this.clientManager = clientManager;
@@ -274,11 +260,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(configNodeRegionId, this);
-    } else {
-      invalidate();
-    }
+    clientManager.returnClient(configNodeRegionId, this);
   }
 
   @Override
@@ -1859,7 +1841,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
           SyncThriftClientWithErrorHandler.newErrorHandler(
               ConfigNodeClient.class,
               ConfigNodeClient.class.getConstructor(
-                  TProtocolFactory.class, long.class, clientManager.getClass()),
+                  List.class, TProtocolFactory.class, long.class, clientManager.getClass()),
+              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
               thriftClientProperty.getProtocolFactory(),
               thriftClientProperty.getConnectionTimeoutMs(),
               clientManager));
@@ -1868,7 +1851,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     @Override
     public boolean validateObject(
         ConfigNodeRegionId configNodeRegionId, PooledObject<ConfigNodeClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().getTransport().isOpen();
+      return Optional.ofNullable(pooledObject.getObject().getTransport())
+          .map(TTransport::isOpen)
+          .orElse(false);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3c5f1e49dd..4512b4ea05 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -51,6 +52,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -100,6 +102,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
 
 public class DataNode implements DataNodeMBean {
+
   private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -232,10 +235,11 @@ public class DataNode implements DataNodeMBean {
     int retry = DEFAULT_RETRY;
     TSystemConfigurationResp configurationResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         configurationResp = configNodeClient.getSystemConfiguration();
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot pull system configurations from ConfigNode-leader, because: {}",
@@ -344,10 +348,11 @@ public class DataNode implements DataNodeMBean {
     req.setClusterName(config.getClusterName());
     TDataNodeRegisterResp dataNodeRegisterResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRegisterResp = configNodeClient.registerDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
         ConfigNodeInfo.getInstance().loadConfigNodeList();
@@ -403,10 +408,11 @@ public class DataNode implements DataNodeMBean {
     req.setDataNodeConfiguration(generateDataNodeConfiguration());
     TDataNodeRestartResp dataNodeRestartResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRestartResp = configNodeClient.restartDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
@@ -652,7 +658,8 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
       TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
@@ -664,7 +671,7 @@ public class DataNode implements DataNodeMBean {
         UDFExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
@@ -761,7 +768,8 @@ public class DataNode implements DataNodeMBean {
 
   private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
       throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           triggerInformationList.stream()
               .map(TriggerInformation::getJarName)
@@ -775,7 +783,7 @@ public class DataNode implements DataNodeMBean {
         TriggerExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 2b254b2296..a1a6d1621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -22,12 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -91,7 +93,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
    * @param args id or ip:rpc_port for removed datanode
    */
   private void doRemoveDataNode(String[] args)
-      throws BadNodeUrlException, TException, IoTDBException {
+      throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
 
     if (args.length != 2) {
       LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
@@ -109,7 +111,8 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     }
     LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
     TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
       LOGGER.info("Remove result {} ", removeResp);
       if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -145,17 +148,19 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     // Below supports multiple datanode deletion, split by ',', and is reserved for extension
     try {
       List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeLocations =
             client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
                 .map(TDataNodeConfiguration::getLocation)
                 .filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
                 .collect(Collectors.toList());
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         LOGGER.error("Get data node locations failed", e);
       }
     } catch (BadNodeUrlException e) {
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         for (String id : args.split(",")) {
           if (!isNumeric(id)) {
             LOGGER.warn("Incorrect id format {}, skipped...", id);
@@ -174,7 +179,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
             dataNodeLocations.add(nodeLocationResult.get(0));
           }
         }
-      } catch (TException e1) {
+      } catch (TException | ClientManagerException e1) {
         LOGGER.error("Get data node locations failed", e);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 81d404bc1b..3667219739 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -50,6 +53,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class RegionMigrateService implements IService {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
   public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
@@ -153,6 +157,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RegionMigratePool extends AbstractPoolManager {
+
     private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
 
     private RegionMigratePool() {
@@ -178,6 +183,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class AddRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
     // The RegionGroup that shall perform the add peer process
@@ -266,6 +272,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RemoveRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -354,6 +361,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class DeleteOldRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -453,6 +461,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class Holder {
+
     private static final RegionMigrateService INSTANCE = new RegionMigrateService();
 
     private Holder() {}
@@ -497,9 +506,10 @@ public class RegionMigrateService implements IService {
   }
 
   private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-      throws TException {
+      throws TException, ClientManagerException {
     TSStatus status;
-    try (ConfigNodeClient client = new ConfigNodeClient()) {
+    try (ConfigNodeClient client =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       status = client.reportRegionMigrateResult(req);
       LOGGER.info(
           "{}, Report region {} migrate result {} to Config node succeed, result: {}",