You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/17 05:47:12 UTC
[iotdb] 04/05: simplify dataclientfactory
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d5c9e7ccf7f7318992c8a8f31bf7b393a9d8828b
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Aug 14 21:23:59 2021 +0800
simplify dataclientfactory
---
.../main/java/org/apache/iotdb/cluster/ClientMain.java | 16 +++++++---------
.../apache/iotdb/cluster/client/sync/SyncDataClient.java | 15 ++++++---------
.../cluster/client/sync/SyncDataHeartbeatClient.java | 10 +++-------
.../apache/iotdb/cluster/client/sync/SyncMetaClient.java | 12 ++++--------
.../cluster/client/sync/SyncMetaHeartbeatClient.java | 10 +++-------
.../main/java/org/apache/iotdb/jdbc/IoTDBConnection.java | 11 +----------
.../apache/iotdb/db/sync/sender/transfer/SyncClient.java | 14 +++-----------
.../java/org/apache/iotdb/rpc/RpcTransportFactory.java | 10 ++++++++++
.../java/org/apache/iotdb/session/SessionConnection.java | 9 +--------
9 files changed, 38 insertions(+), 69 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index cf30381..f56f699 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -19,6 +19,12 @@
package org.apache.iotdb.cluster;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.jdbc.Config;
@@ -43,18 +49,10 @@ import org.apache.iotdb.session.SessionDataSet;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.DefaultParser;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -277,7 +275,7 @@ public class ClientMain {
@SuppressWarnings({"java:S2095"}) // the transport is used later
private static Client getClient(String ip, int port) throws TTransportException {
TSIService.Client.Factory factory = new Factory();
- TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(ip, port));
+ TTransport transport = RpcTransportFactory.INSTANCE.getTransportWithNoTimeout(ip, port);
transport.open();
TProtocol protocol =
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 487b087..764afdf 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -24,11 +24,9 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import java.io.Closeable;
@@ -45,22 +43,21 @@ public class SyncDataClient extends Client implements Closeable {
Node node;
SyncClientPool pool;
- public SyncDataClient(TProtocol prot) {
+ SyncDataClient(TProtocol prot) {
super(prot);
}
- public SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
+ private SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
super(
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- node.getInternalIp(),
- node.getDataPort(),
- ClusterConstant.getConnectionTimeoutInMS()))));
+ node.getInternalIp(),
+ node.getDataPort(),
+ ClusterConstant.getConnectionTimeoutInMS())));
+
this.node = node;
this.pool = pool;
getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index a95c44a..4f6c259 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
/**
@@ -40,11 +38,9 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
super(
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- node.getInternalIp(),
- node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS()))));
+ node.getInternalIp(),
+ node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
+ ClusterConstant.getConnectionTimeoutInMS())));
this.node = node;
this.pool = pool;
getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 7b77837..b5b3615 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -24,10 +24,8 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import java.io.Closeable;
@@ -47,16 +45,14 @@ public class SyncMetaClient extends Client implements Closeable {
super(prot);
}
- public SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
+ private SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
super(
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- node.getInternalIp(),
- node.getMetaPort(),
- ClusterConstant.getConnectionTimeoutInMS()))));
+ node.getInternalIp(),
+ node.getMetaPort(),
+ ClusterConstant.getConnectionTimeoutInMS())));
this.node = node;
this.pool = pool;
getInputProtocol().getTransport().open();
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index 451897b..929db23 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -23,9 +23,7 @@ import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
/**
@@ -40,11 +38,9 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
super(
protocolFactory.getProtocol(
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- node.getInternalIp(),
- node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
- ClusterConstant.getConnectionTimeoutInMS()))));
+ node.getInternalIp(),
+ node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
+ ClusterConstant.getConnectionTimeoutInMS())));
this.node = node;
this.pool = pool;
getInputProtocol().getTransport().open();
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 3362edd..a0c696c 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.jdbc;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.rpc.thrift.ServerProperties;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSIService;
@@ -30,12 +29,9 @@ import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
-import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -75,7 +71,6 @@ public class IoTDBConnection implements Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TTransport transport;
- private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
/**
* Timeout of query can be set by users. Unit: s If not set, default value 0 will be used, which
* will use server configuration.
@@ -453,11 +448,7 @@ public class IoTDBConnection implements Connection {
RpcTransportFactory.setThriftMaxFrameSize(params.getThriftMaxFrameSize());
transport =
RpcTransportFactory.INSTANCE.getTransport(
- new TSocket(
- tConfiguration,
- params.getHost(),
- params.getPort(),
- Config.DEFAULT_CONNECTION_TIMEOUT_MS));
+ params.getHost(), params.getPort(), Config.DEFAULT_CONNECTION_TIMEOUT_MS);
if (!transport.isOpen()) {
transport.open();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index 8acd63e..066338f 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.sync.sender.transfer;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -37,14 +38,9 @@ import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
-import org.apache.iotdb.rpc.TConfigurationConst;
-import org.apache.iotdb.rpc.TSocketWrapper;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.thrift.TConfiguration;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -135,8 +131,6 @@ public class SyncClient implements ISyncClient {
init();
}
- private TConfiguration tConfiguration = TConfigurationConst.defaultTConfiguration;
-
public static SyncClient getInstance() {
return InstanceHolder.INSTANCE;
}
@@ -297,9 +291,7 @@ public class SyncClient implements ISyncClient {
RpcTransportFactory.setDefaultBufferCapacity(ioTDBConfig.getThriftDefaultBufferSize());
RpcTransportFactory.setThriftMaxFrameSize(ioTDBConfig.getThriftMaxFrameSize());
try {
- transport =
- RpcTransportFactory.INSTANCE.getTransport(
- TSocketWrapper.wrap(tConfiguration, serverIp, serverPort, TIMEOUT_MS));
+ transport = RpcTransportFactory.INSTANCE.getTransport(serverIp, serverPort, TIMEOUT_MS);
TProtocol protocol;
if (ioTDBConfig.isRpcThriftCompressionEnable()) {
protocol = new TCompactProtocol(transport);
@@ -595,7 +587,7 @@ public class SyncClient implements ISyncClient {
}
/**
- * Make snapshot<hard link> for new tsfile and its .restore file.
+ * Make snapshot hard link for new tsfile and its .restore file.
*
* @param file new tsfile to be synced
*/
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index c2c4fb2..c7d2b13 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
@@ -48,6 +49,15 @@ public class RpcTransportFactory extends TTransportFactory {
return inner.getTransport(trans);
}
+ public TTransport getTransportWithNoTimeout(String ip, int port) throws TTransportException {
+ return inner.getTransport(new TSocket(TConfigurationConst.defaultTConfiguration, ip, port));
+ }
+
+ public TTransport getTransport(String ip, int port, int timeout) throws TTransportException {
+ return inner.getTransport(
+ new TSocket(TConfigurationConst.defaultTConfiguration, ip, port, timeout));
+ }
+
public static boolean isUseSnappy() {
return USE_SNAPPY;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 79b9967..6f0fcd1 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateAlignedTimeseriesReq;
@@ -49,11 +48,9 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
-
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
@@ -105,11 +102,7 @@ public class SessionConnection {
transport =
RpcTransportFactory.INSTANCE.getTransport(
// as there is a try-catch already, we do not need to use TSocket.wrap
- new TSocket(
- TConfigurationConst.defaultTConfiguration,
- endPoint.getIp(),
- endPoint.getPort(),
- session.connectionTimeoutInMs));
+ endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs);
transport.open();
} catch (TTransportException e) {
throw new IoTDBConnectionException(e);