You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2023/01/17 07:45:43 UTC

[iotdb] branch jira5425 created (now 46e9a234d2)

This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a change to branch jira5425
in repository https://gitbox.apache.org/repos/asf/iotdb.git


      at 46e9a234d2 finish

This branch includes the following new commits:

     new 46e9a234d2 finish

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: finish

Posted by ta...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira5425
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 46e9a234d27cd96f26de142dbcb39e6f6aef23cf
Author: OneSizeFitQuorum <ta...@apache.org>
AuthorDate: Tue Jan 17 15:45:27 2023 +0800

    finish
    
    Signed-off-by: OneSizeFitQuorum <ta...@apache.org>
---
 .../apache/iotdb/commons/client/ClientManager.java | 13 ++++-----
 .../apache/iotdb/db/client/ConfigNodeClient.java   | 33 +++++++---------------
 .../java/org/apache/iotdb/db/service/DataNode.java | 28 +++++++++++-------
 .../db/service/DataNodeServerCommandLine.java      | 17 +++++++----
 .../iotdb/db/service/RegionMigrateService.java     | 14 +++++++--
 5 files changed, 56 insertions(+), 49 deletions(-)

diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 770c729bbe..3ee338d4ec 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -57,9 +57,10 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
   }
 
   /**
-   * return a client V for node K to the ClientManager Note: We do not define this interface in
-   * IClientManager to make you aware that the return of a client is automatic whenever a particular
-   * client is used.
+   * return a client V for node K to the ClientManager.
+   *
+   * <p>Note: We do not define this interface in IClientManager to make you aware that the return of
+   * a client is automatic whenever a particular client is used.
    */
   public void returnClient(K node, V client) {
     Optional.ofNullable(node)
@@ -89,10 +90,6 @@ public class ClientManager<K, V> implements IClientManager<K, V> {
 
   @Override
   public void close() {
-    try {
-      pool.close();
-    } catch (Exception e) {
-      logger.warn("Close client pool failed", e);
-    }
+    pool.close();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
index 9d294c2454..60a40ee545 100644
--- a/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/client/ConfigNodeClient.java
@@ -27,7 +27,6 @@ import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
 import org.apache.iotdb.commons.client.ClientManager;
 import org.apache.iotdb.commons.client.ThriftClient;
 import org.apache.iotdb.commons.client.factory.ThriftClientFactory;
-import org.apache.iotdb.commons.client.property.ClientPoolProperty;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.client.sync.SyncThriftClientWithErrorHandler;
 import org.apache.iotdb.commons.consensus.ConfigNodeRegionId;
@@ -126,8 +125,6 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.commons.pool2.PooledObject;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
@@ -149,7 +146,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   private static final int RETRY_INTERVAL_MS = 1000;
 
-  private long connectionTimeout = ClientPoolProperty.DefaultProperty.WAIT_CLIENT_TIMEOUT_MS;
+  private long connectionTimeout;
 
   private IConfigNodeRPCService.Iface client;
 
@@ -171,23 +168,13 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   TProtocolFactory protocolFactory;
 
-  public ConfigNodeClient() throws TException {
-    // Read config nodes from configuration
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
-    protocolFactory =
-        IoTDBDescriptor.getInstance().getConf().isDnRpcThriftCompressionEnable()
-            ? new TCompactProtocol.Factory()
-            : new TBinaryProtocol.Factory();
-
-    init();
-  }
-
   public ConfigNodeClient(
+      List<TEndPoint> configNodes,
       TProtocolFactory protocolFactory,
       long connectionTimeout,
       ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager)
       throws TException {
-    configNodes = ConfigNodeInfo.getInstance().getLatestConfigNodes();
+    this.configNodes = configNodes;
     this.protocolFactory = protocolFactory;
     this.connectionTimeout = connectionTimeout;
     this.clientManager = clientManager;
@@ -282,11 +269,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
 
   @Override
   public void close() {
-    if (clientManager != null) {
-      clientManager.returnClient(configNodeRegionId, this);
-    } else {
-      invalidate();
-    }
+    clientManager.returnClient(configNodeRegionId, this);
   }
 
   @Override
@@ -1940,6 +1923,7 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
   }
 
   public static class Factory extends ThriftClientFactory<ConfigNodeRegionId, ConfigNodeClient> {
+
     public Factory(
         ClientManager<ConfigNodeRegionId, ConfigNodeClient> clientManager,
         ThriftClientProperty thriftClientProperty) {
@@ -1959,7 +1943,8 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
           SyncThriftClientWithErrorHandler.newErrorHandler(
               ConfigNodeClient.class,
               ConfigNodeClient.class.getConstructor(
-                  TProtocolFactory.class, long.class, clientManager.getClass()),
+                  List.class, TProtocolFactory.class, long.class, clientManager.getClass()),
+              ConfigNodeInfo.getInstance().getLatestConfigNodes(),
               thriftClientProperty.getProtocolFactory(),
               thriftClientProperty.getConnectionTimeoutMs(),
               clientManager));
@@ -1968,7 +1953,9 @@ public class ConfigNodeClient implements IConfigNodeRPCService.Iface, ThriftClie
     @Override
     public boolean validateObject(
         ConfigNodeRegionId configNodeRegionId, PooledObject<ConfigNodeClient> pooledObject) {
-      return pooledObject.getObject() != null && pooledObject.getObject().getTransport().isOpen();
+      return Optional.ofNullable(pooledObject.getObject().getTransport())
+          .map(TTransport::isOpen)
+          .orElse(false);
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index f0d356a609..f212976cf6 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TNodeResource;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBDefaultThreadExceptionHandler;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
@@ -54,6 +55,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TRuntimeConfiguration;
 import org.apache.iotdb.confignode.rpc.thrift.TSystemConfigurationResp;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -101,6 +103,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
 
 public class DataNode implements DataNodeMBean {
+
   private static final Logger logger = LoggerFactory.getLogger(DataNode.class);
 
   private static final CommonConfig COMMON_CONFIG = CommonDescriptor.getInstance().getConf();
@@ -235,10 +238,11 @@ public class DataNode implements DataNodeMBean {
     int retry = DEFAULT_RETRY;
     TSystemConfigurationResp configurationResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         configurationResp = configNodeClient.getSystemConfiguration();
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot pull system configurations from ConfigNode-leader, because: {}",
@@ -357,10 +361,11 @@ public class DataNode implements DataNodeMBean {
     req.setClusterName(IOTDB_CONFIG.getClusterName());
     TDataNodeRegisterResp dataNodeRegisterResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRegisterResp = configNodeClient.registerDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn("Cannot register to the cluster, because: {}", e.getMessage());
         ConfigNodeInfo.getInstance().loadConfigNodeList();
@@ -416,10 +421,11 @@ public class DataNode implements DataNodeMBean {
     req.setDataNodeConfiguration(generateDataNodeConfiguration());
     TDataNodeRestartResp dataNodeRestartResp = null;
     while (retry > 0) {
-      try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+      try (ConfigNodeClient configNodeClient =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeRestartResp = configNodeClient.restartDataNode(req);
         break;
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         // Read ConfigNodes from system.properties and retry
         logger.warn(
             "Cannot send restart request to the ConfigNode-leader, because: {}", e.getMessage());
@@ -669,7 +675,8 @@ public class DataNode implements DataNodeMBean {
   }
 
   private void getJarOfUDFs(List<UDFInformation> udfInformationList) throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           udfInformationList.stream().map(UDFInformation::getJarName).collect(Collectors.toList());
       TGetJarInListResp resp = configNodeClient.getUDFJar(new TGetJarInListReq(jarNameList));
@@ -681,7 +688,7 @@ public class DataNode implements DataNodeMBean {
         UDFExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), udfInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
@@ -778,7 +785,8 @@ public class DataNode implements DataNodeMBean {
 
   private void getJarOfTriggers(List<TriggerInformation> triggerInformationList)
       throws StartupException {
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       List<String> jarNameList =
           triggerInformationList.stream()
               .map(TriggerInformation::getJarName)
@@ -792,7 +800,7 @@ public class DataNode implements DataNodeMBean {
         TriggerExecutableManager.getInstance()
             .saveToInstallDir(jarList.get(i), triggerInformationList.get(i).getJarName());
       }
-    } catch (IOException | TException e) {
+    } catch (IOException | TException | ClientManagerException e) {
       throw new StartupException(e);
     }
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
index 2b254b2296..a1a6d1621a 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNodeServerCommandLine.java
@@ -22,12 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.ServerCommandLine;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.exception.BadNodeUrlException;
 import org.apache.iotdb.commons.exception.IoTDBException;
 import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeRemoveResp;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -91,7 +93,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
    * @param args id or ip:rpc_port for removed datanode
    */
   private void doRemoveDataNode(String[] args)
-      throws BadNodeUrlException, TException, IoTDBException {
+      throws BadNodeUrlException, TException, IoTDBException, ClientManagerException {
 
     if (args.length != 2) {
       LOGGER.info("Usage: <node-id>/<ip>:<rpc-port>");
@@ -109,7 +111,8 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     }
     LOGGER.info("Start to remove datanode, removed datanode endpoints: {}", dataNodeLocations);
     TDataNodeRemoveReq removeReq = new TDataNodeRemoveReq(dataNodeLocations);
-    try (ConfigNodeClient configNodeClient = new ConfigNodeClient()) {
+    try (ConfigNodeClient configNodeClient =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       TDataNodeRemoveResp removeResp = configNodeClient.removeDataNode(removeReq);
       LOGGER.info("Remove result {} ", removeResp);
       if (removeResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -145,17 +148,19 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
     // Below supports multiple datanode deletion, split by ',', and is reserved for extension
     try {
       List<TEndPoint> endPoints = NodeUrlUtils.parseTEndPointUrls(args);
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         dataNodeLocations =
             client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
                 .map(TDataNodeConfiguration::getLocation)
                 .filter(location -> endPoints.contains(location.getClientRpcEndPoint()))
                 .collect(Collectors.toList());
-      } catch (TException e) {
+      } catch (TException | ClientManagerException e) {
         LOGGER.error("Get data node locations failed", e);
       }
     } catch (BadNodeUrlException e) {
-      try (ConfigNodeClient client = new ConfigNodeClient()) {
+      try (ConfigNodeClient client =
+          ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
         for (String id : args.split(",")) {
           if (!isNumeric(id)) {
             LOGGER.warn("Incorrect id format {}, skipped...", id);
@@ -174,7 +179,7 @@ public class DataNodeServerCommandLine extends ServerCommandLine {
             dataNodeLocations.add(nodeLocationResult.get(0));
           }
         }
-      } catch (TException e1) {
+      } catch (TException | ClientManagerException e1) {
         LOGGER.error("Get data node locations failed", e);
       }
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 327efc3787..b027be04ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -34,6 +35,8 @@ import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.client.ConfigNodeInfo;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
 import org.apache.iotdb.db.engine.StorageEngine;
@@ -50,6 +53,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class RegionMigrateService implements IService {
+
   private static final Logger LOGGER = LoggerFactory.getLogger(RegionMigrateService.class);
 
   public static final String REGION_MIGRATE_PROCESS = "[REGION_MIGRATE_PROCESS]";
@@ -153,6 +157,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RegionMigratePool extends AbstractPoolManager {
+
     private final Logger poolLogger = LoggerFactory.getLogger(RegionMigratePool.class);
 
     private RegionMigratePool() {
@@ -178,6 +183,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class AddRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(AddRegionPeerTask.class);
 
     // The RegionGroup that shall perform the add peer process
@@ -266,6 +272,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class RemoveRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(RemoveRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -354,6 +361,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class DeleteOldRegionPeerTask implements Runnable {
+
     private static final Logger taskLogger = LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
 
     private final TConsensusGroupId tRegionId;
@@ -453,6 +461,7 @@ public class RegionMigrateService implements IService {
   }
 
   private static class Holder {
+
     private static final RegionMigrateService INSTANCE = new RegionMigrateService();
 
     private Holder() {}
@@ -497,9 +506,10 @@ public class RegionMigrateService implements IService {
   }
 
   private static void reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
-      throws TException {
+      throws TException, ClientManagerException {
     TSStatus status;
-    try (ConfigNodeClient client = new ConfigNodeClient()) {
+    try (ConfigNodeClient client =
+        ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.configNodeRegionId)) {
       status = client.reportRegionMigrateResult(req);
       LOGGER.info(
           "{}, Report region {} migrate result {} to Config node succeed, result: {}",