You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/28 15:45:29 UTC

[iotdb] branch jira5425 created (now 93230e34d9)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5425
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 93230e34d9 [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)

This branch includes the following new commits:

     new 93230e34d9 [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5425
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 93230e34d9e7988b52c9f7803d682b727f222579
Author: Potato <ta...@apache.org>
AuthorDate: Sat Jan 28 20:35:39 2023 +0800

    [IOTDB-5425] Consolidate all ConfigNodeClient to be managed by clientManager (#8891)
---
 .../apache/iotdb/commons/client/ClientManager.java | 13 ++++-----
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 33 ++++++----------------
 .../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++-------
 .../db/service/DataNodeServerCommandLine.java      | 17 +++++++----
 .../iotdb/db/service/RegionMigrateService.java     | 14 +++++++--
 5 files changed, 55 insertions(+), 50 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 770c729bbe..3ee338d4ec 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -57,9 +57,10 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
   }
 
   /**
-   * return a client V for node K to the ClientManager Note: We do not define this interface in
-   * IClientManager to make you aware that the return of a client is automatic whenever a particular
-   * client is used.
+   * return a client V for node K to the ClientManager.
+   *
+   * <p>Note: We do not define this interface in IClientManager to make you aware that the return of
+   * a client is automatic whenever a particular client is used.
    */
   public void returnClient(K node, V client) {
     Optional.ofNullable(node)
@@ -89,10 +90,6 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
 
   @Override
   public void close() {
-    try {
-      pool.close();
-    } catch (Exception e) {
-      logger.warn("Close client pool failed", e);
-    }
+    pool.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 702818e2ef..ffd25ad9c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -27,10 +27,8 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
@@ -118,8 +116,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -141,7 +137,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   private static final int RETRY_INTERVAL_MS = 1000;
 
-  private long connectionTimeout = ClientPoolProperty.DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
+  private final long connectionTimeout;
 
   private IConfigNodeRPCService.Iface client;
 
@@ -163,23 +159,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   TProtocolFactory protocolFactory;
 
-  public ConfigNodeClient() throws TException {
-    // Read config nodes from configuration
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
-    protocolFactory =
-        CommonDescriptor.getInstance().getConfig().isRpcThriftCompressionEnabled()
-            ? new TCompactProtocol.Factory()
-            : new TBinaryProtocol.Factory();
-
-    init();
-  }
-
   public ConfigNodeClient(
+      List<TEndPoint> configNodes,
       TProtocolFactory protocolFactory,
       long connectionTimeout,
       ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager)
       throws TException {
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
+    this.configNodes = configNodes;
     this.protocolFactory = protocolFactory;
     this.connectionTimeout = connectionTimeout;
     this.clientManager = clientManager;
@@ -274,11 +260,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(configNodeRegionId, this);
-    } else {
-      invalidate();
-    }
+    clientManager.returnClient(configNodeRegionId, this);
   }
 
   @Override
@@ -1859,7 +1841,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
           SyncThriftClientWithErrorHandler.newErrorHandler(
               ConfigNodeClient.class,
               ConfigNodeClient.class.getConstructor(
-                  TProtocolFactory.class, long.class, clientManager.getClass()),
+                  List.class, TProtocolFactory.class, long.class, clientManager.getClass()),
+              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
               thriftClientProperty.getProtocolFactory(),
               thriftClientProperty.getConnectionTimeoutMs(),
               clientManager));
@@ -1868,7 +1851,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     @Override
     public boolean validateObject(
         ConfigNodeRegionId configNodeRegionId, PooledObject<ConfigNodeClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().getTransport().isOpen();
+      return Optional.ofNullable(pooledObject.getObject().getTransport())
+          .map(TTransport::isOpen)
+          .orElse(false);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3c5f1e49dd..4512b4ea05 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -51,6 +52,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -100,6 +102,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
 
 public class DataNode implements DataNodeMBean {
+
   private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
@@ -232,10 +235,11 @@ public class DataNode implements DataNodeMBean {
     int retry = DEFAULT_RETRY;
     TSystemConfigurationResp configurationResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         configurationResp = configNodeClient.getSystemConfiguration();
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot pull system configurations from ConfigNode-leader, because: {}",
@@ -344,10 +348,11 @@ public class DataNode implements DataNodeMBean {
     req.setClusterName(config.getClusterName());
     TDataNodeRegisterResp dataNodeRegisterResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRegisterResp = configNodeClient.registerDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
         ConfigNodeInfo.getInstance().loadConfigNodeList();
@@ -403,10 +408,11 @@ public class DataNode implements DataNodeMBean {
     req.setDataNodeConfiguration(generateDataNodeConfiguration());
     TDataNodeRestartResp dataNodeRestartResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRestartResp = configNodeClient.restartDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
@@ -652,7 +658,8 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
       TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
@@ -664,7 +671,7 @@ public class DataNode implements DataNodeMBean {
         UDFExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
@@ -761,7 +768,8 @@ public class DataNode implements DataNodeMBean {
 
   private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
       throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           triggerInformationList.stream()
               .map(TriggerInformation::getJarName)
@@ -775,7 +783,7 @@ public class DataNode implements DataNodeMBean {
         TriggerExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 2b254b2296..a1a6d1621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -22,12 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -91,7 +93,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
    * @param args id or ip:rpc_port for removed datanode
    */
   private void doRemoveDataNode(String[] args)
-      throws BadNodeUrlException, TException, IoTDBException {
+      throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
 
     if (args.length != 2) {
       LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
@@ -109,7 +111,8 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     }
     LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
     TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
       LOGGER.info("Remove result {} ", removeResp);
       if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -145,17 +148,19 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     // Below supports multiple datanode deletion, split by ',', and is reserved for extension
     try {
       List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeLocations =
             client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
                 .map(TDataNodeConfiguration::getLocation)
                 .filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
                 .collect(Collectors.toList());
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         LOGGER.error("Get data node locations failed", e);
       }
     } catch (BadNodeUrlException e) {
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         for (String id : args.split(",")) {
           if (!isNumeric(id)) {
             LOGGER.warn("Incorrect id format {}, skipped...", id);
@@ -174,7 +179,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
             dataNodeLocations.add(nodeLocationResult.get(0));
           }
         }
-      } catch (TException e1) {
+      } catch (TException | ClientManagerException e1) {
         LOGGER.error("Get data node locations failed", e);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 81d404bc1b..3667219739 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -50,6 +53,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class RegionMigrateService implements IService {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
   public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
@@ -153,6 +157,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RegionMigratePool extends AbstractPoolManager {
+
     private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
 
     private RegionMigratePool() {
@@ -178,6 +183,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class AddRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
     // The RegionGroup that shall perform the add peer process
@@ -266,6 +272,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RemoveRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -354,6 +361,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class DeleteOldRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -453,6 +461,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class Holder {
+
     private static final RegionMigrateService INSTANCE = new RegionMigrateService();
 
     private Holder() {}
@@ -497,9 +506,10 @@ public class RegionMigrateService implements IService {
   }
 
   private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-      throws TException {
+      throws TException, ClientManagerException {
     TSStatus status;
-    try (ConfigNodeClient client = new ConfigNodeClient()) {
+    try (ConfigNodeClient client =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       status = client.reportRegionMigrateResult(req);
       LOGGER.info(
           "{}, Report region {} migrate result {} to Config node succeed, result: {}",