You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/17 05:47:12 UTC

[iotdb] 04/05: simplify dataclientfactory

This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d5c9e7ccf7f7318992c8a8f31bf7b393a9d8828b
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Aug 14 21:23:59 2021 +0800

    simplify dataclientfactory
---
 .../main/java/org/apache/iotdb/cluster/ClientMain.java   | 16 +++++++---------
 .../apache/iotdb/cluster/client/sync/SyncDataClient.java | 15 ++++++---------
 .../cluster/client/sync/SyncDataHeartbeatClient.java     | 10 +++-------
 .../apache/iotdb/cluster/client/sync/SyncMetaClient.java | 12 ++++--------
 .../cluster/client/sync/SyncMetaHeartbeatClient.java     | 10 +++-------
 .../main/java/org/apache/iotdb/jdbc/IoTDBConnection.java | 11 +----------
 .../apache/iotdb/db/sync/sender/transfer/SyncClient.java | 14 +++-----------
 .../java/org/apache/iotdb/rpc/RpcTransportFactory.java   | 10 ++++++++++
 .../java/org/apache/iotdb/session/SessionConnection.java |  9 +--------
 9 files changed, 38 insertions(+), 69 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index cf30381..f56f699 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -19,6 +19,12 @@
 
 package org.apache.iotdb.cluster;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.jdbc.Config;
@@ -43,18 +49,10 @@ import org.apache.iotdb.session.SessionDataSet;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -277,7 +275,7 @@ public class ClientMain {
   @SuppressWarnings({"java:S2095"}) // the transport is used later
   private static Client getClient(String ip, int port) throws TTransportException {
     TSIService.Client.Factory factory = new Factory();
-    TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(ip, port));
+    TTransport transport = RpcTransportFactory.INSTANCE.getTransportWithNoTimeout(ip, port);
     transport.open();
     TProtocol protocol =
         IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 487b087..764afdf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -24,11 +24,9 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
 import org.apache.iotdb.db.utils.TestOnly;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TimeoutChangeableTransport;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 import java.io.Closeable;
@@ -45,22 +43,21 @@ public class SyncDataClient extends Client implements Closeable {
   Node node;
   SyncClientPool pool;
 
-  public SyncDataClient(TProtocol prot) {
+  SyncDataClient(TProtocol prot) {
     super(prot);
   }
 
-  public SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
+  private SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
 
     // the difference of the two clients lies in the port
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
-                new TSocket(
-                    TConfigurationConst.defaultTConfiguration,
-                    node.getInternalIp(),
-                    node.getDataPort(),
-                    ClusterConstant.getConnectionTimeoutInMS()))));
+                node.getInternalIp(),
+                node.getDataPort(),
+                ClusterConstant.getConnectionTimeoutInMS())));
+
     this.node = node;
     this.pool = pool;
     getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index a95c44a..4f6c259 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 /**
@@ -40,11 +38,9 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
-                new TSocket(
-                    TConfigurationConst.defaultTConfiguration,
-                    node.getInternalIp(),
-                    node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
-                    ClusterConstant.getConnectionTimeoutInMS()))));
+                node.getInternalIp(),
+                node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+                ClusterConstant.getConnectionTimeoutInMS())));
     this.node = node;
     this.pool = pool;
     getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 7b77837..b5b3615 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -24,10 +24,8 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 import java.io.Closeable;
@@ -47,16 +45,14 @@ public class SyncMetaClient extends Client implements Closeable {
     super(prot);
   }
 
-  public SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
+  private SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
       throws TTransportException {
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
-                new TSocket(
-                    TConfigurationConst.defaultTConfiguration,
-                    node.getInternalIp(),
-                    node.getMetaPort(),
-                    ClusterConstant.getConnectionTimeoutInMS()))));
+                node.getInternalIp(),
+                node.getMetaPort(),
+                ClusterConstant.getConnectionTimeoutInMS())));
     this.node = node;
     this.pool = pool;
     getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index 451897b..929db23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
 import org.apache.iotdb.cluster.utils.ClusterUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransportException;
 
 /**
@@ -40,11 +38,9 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
     super(
         protocolFactory.getProtocol(
             RpcTransportFactory.INSTANCE.getTransport(
-                new TSocket(
-                    TConfigurationConst.defaultTConfiguration,
-                    node.getInternalIp(),
-                    node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
-                    ClusterConstant.getConnectionTimeoutInMS()))));
+                node.getInternalIp(),
+                node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
+                ClusterConstant.getConnectionTimeoutInMS())));
     this.node = node;
     this.pool = pool;
     getInputProtocol().getTransport().open();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 3362edd..a0c696c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.jdbc;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.service.rpc.thrift.ServerProperties;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -30,12 +29,9 @@ import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
 import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -75,7 +71,6 @@ public class IoTDBConnection implements Connection {
   private boolean isClosed = true;
   private SQLWarning warningChain = null;
   private TTransport transport;
-  private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
   /**
    * Timeout of query can be set by users. Unit: s If not set, default value 0 will be used, which
    * will use server configuration.
@@ -453,11 +448,7 @@ public class IoTDBConnection implements Connection {
     RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
     transport =
         RpcTransportFactory.INSTANCE.getTransport(
-            new TSocket(
-                tConfiguration,
-                params.getHost(),
-                params.getPort(),
-                Config.DEFAULT_CONNECTION_TIMEOUT_MS));
+            params.getHost(), params.getPort(), Config.DEFAULT_CONNECTION_TIMEOUT_MS);
     if (!transport.isOpen()) {
       transport.open();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 8acd63e..066338f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iotdb.db.sync.sender.transfer;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.db.concurrent.ThreadName;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -37,14 +38,9 @@ import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
 import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
 import org.apache.iotdb.db.utils.SyncUtils;
 import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TSocketWrapper;
 import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
 import org.apache.iotdb.service.sync.thrift.SyncService;
 import org.apache.iotdb.service.sync.thrift.SyncStatus;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.thrift.TConfiguration;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -135,8 +131,6 @@ public class SyncClient implements ISyncClient {
     init();
   }
 
-  private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
-
   public static SyncClient getInstance() {
     return InstanceHolder.INSTANCE;
   }
@@ -297,9 +291,7 @@ public class SyncClient implements ISyncClient {
     RpcTransportFactory.setDefaultBufferCapacity(ioTDBConfig.getThriftDefaultBufferSize());
     RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
     try {
-      transport =
-          RpcTransportFactory.INSTANCE.getTransport(
-              TSocketWrapper.wrap(tConfiguration, serverIp, serverPort, TIMEOUT_MS));
+      transport = RpcTransportFactory.INSTANCE.getTransport(serverIp, serverPort, TIMEOUT_MS);
       TProtocol protocol;
       if (ioTDBConfig.isRpcThriftCompressionEnable()) {
         protocol = new TCompactProtocol(transport);
@@ -595,7 +587,7 @@ public class SyncClient implements ISyncClient {
   }
 
   /**
-   * Make snapshot<hard link> for new tsfile and its .restore file.
+   * Make snapshot hard link for new tsfile and its .restore file.
    *
    * @param file new tsfile to be synced
    */
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index c2c4fb2..c7d2b13 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.rpc;
 
+import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
@@ -48,6 +49,15 @@ public class RpcTransportFactory extends TTransportFactory {
     return inner.getTransport(trans);
   }
 
+  public TTransport getTransportWithNoTimeout(String ip, int port) throws TTransportException {
+    return inner.getTransport(new TSocket(TConfigurationConst.defaultTConfiguration, ip, port));
+  }
+
+  public TTransport getTransport(String ip, int port, int timeout) throws TTransportException {
+    return inner.getTransport(
+        new TSocket(TConfigurationConst.defaultTConfiguration, ip, port, timeout));
+  }
+
   public static boolean isUseSnappy() {
     return USE_SNAPPY;
   }
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 79b9967..6f0fcd1 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.rpc.RedirectException;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.service.rpc.thrift.EndPoint;
 import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
 import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
@@ -49,11 +48,9 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
 import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TSocket;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
@@ -105,11 +102,7 @@ public class SessionConnection {
       transport =
           RpcTransportFactory.INSTANCE.getTransport(
               // as there is a try-catch already, we do not need to use TSocket.wrap
-              new TSocket(
-                  TConfigurationConst.defaultTConfiguration,
-                  endPoint.getIp(),
-                  endPoint.getPort(),
-                  session.connectionTimeoutInMs));
+              endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs);
       transport.open();
     } catch (TTransportException e) {
       throw new IoTDBConnectionException(e);