You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hx...@apache.org on 2021/08/17 05:47:10 UTC
[iotdb] 02/05: continue to refactor
This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch cluster-
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8fff2ebe4349b73fc9012b65fecfecb5abfb5f49
Author: xiangdong huang <sa...@gmail.com>
AuthorDate: Sat Aug 14 13:00:48 2021 +0800
continue to refactor
---
.../org/apache/iotdb/cluster/ClusterIoTDB.java | 158 +++++++++++----------
.../iotdb/cluster/client/DataClientProvider.java | 14 +-
.../cluster/client/async/AsyncDataClient.java | 5 +-
.../client/async/AsyncDataHeartbeatClient.java | 5 +-
.../cluster/client/async/AsyncMetaClient.java | 5 +-
.../client/async/AsyncMetaHeartbeatClient.java | 5 +-
.../iotdb/cluster/client/sync/SyncDataClient.java | 5 +-
.../client/sync/SyncDataHeartbeatClient.java | 5 +-
.../iotdb/cluster/client/sync/SyncMetaClient.java | 5 +-
.../client/sync/SyncMetaHeartbeatClient.java | 5 +-
.../cluster/server/member/DataGroupMember.java | 9 +-
.../cluster/server/member/MetaGroupMember.java | 12 +-
.../cluster/client/async/AsyncClientPoolTest.java | 14 +-
.../client/async/AsyncDataHeartbeatClientTest.java | 7 +-
.../cluster/client/async/AsyncMetaClientTest.java | 14 +-
.../client/async/AsyncMetaHeartbeatClientTest.java | 7 +-
.../cluster/client/sync/SyncDataClientTest.java | 10 +-
.../client/sync/SyncDataHeartbeatClientTest.java | 5 +-
.../cluster/client/sync/SyncMetaClientTest.java | 10 +-
.../client/sync/SyncMetaHeartbeatClientTest.java | 7 +-
.../resources/conf/iotdb-engine.properties | 1 +
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 7 +-
.../org/apache/iotdb/rpc/RpcTransportFactory.java | 24 ++--
23 files changed, 168 insertions(+), 171 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
index 0201c6c..eebc015 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClusterIoTDB.java
@@ -247,59 +247,10 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
public void activeStartNodeMode() {
try {
- stopRaftInfoReport();
-
- startServerCheck();
- preStartCustomize();
-
- iotdb.active();
- JMXService.registerMBean(this, mbeanName);
- // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the
- // cluster moduel.
- // TODO fixme it is better to remove coordinator out of metaGroupEngine
-
- registerManager.register(metaGroupEngine);
- registerManager.register(dataGroupEngine);
-
- // rpc service initialize
- if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
- MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService);
- MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService);
- DataRaftService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
- DataRaftHeartBeatService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
- } else {
- MetaSyncService syncService = new MetaSyncService(metaGroupEngine);
- MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService);
- MetaRaftService.getInstance().initSyncedServiceImpl(syncService);
- DataRaftService.getInstance().initSyncedServiceImpl(dataGroupEngine);
- DataRaftHeartBeatService.getInstance().initSyncedServiceImpl(dataGroupEngine);
- }
- // start RPC service
- logger.info("start Meta Heartbeat RPC service... ");
- registerManager.register(MetaRaftHeartBeatService.getInstance());
- // TODO: better to start the Meta RPC service untill the heartbeatservice has elected the
- // leader.
- // and quorum of followers have caught up.
- logger.info("start Meta RPC service... ");
- registerManager.register(MetaRaftService.getInstance());
-
+ preInitCluster();
metaGroupEngine.buildCluster();
- logger.info("start Data Heartbeat RPC service... ");
- registerManager.register(DataRaftHeartBeatService.getInstance());
- logger.info("start Data RPC service... ");
- registerManager.register(DataRaftService.getInstance());
- // RPC based DBA API
- registerManager.register(ClusterInfoServer.getInstance());
- // JMX based DBA API
- registerManager.register(ClusterMonitor.INSTANCE);
- // we must wait until the metaGroup established.
- // So that the ClusterRPCService can work.
- ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
- clusterRPCServiceImpl.setCoordinator(coordinator);
- clusterRPCServiceImpl.setExecutor(metaGroupEngine);
- ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
- registerManager.register(ClusterRPCService.getInstance());
+ postInitCluster();
+ startClientRPC();
} catch (StartupException
| StartUpCheckFailureException
| ConfigInconsistentException
@@ -309,33 +260,84 @@ public class ClusterIoTDB implements ClusterIoTDBMBean {
}
}
+ private void preInitCluster() throws StartupException {
+ stopRaftInfoReport();
+ startServerCheck();
+ preStartCustomize();
+ iotdb.active();
+ JMXService.registerMBean(this, mbeanName);
+ // register MetaGroupMember. MetaGroupMember has the same position with "StorageEngine" in the
+ // cluster moduel.
+ // TODO fixme it is better to remove coordinator out of metaGroupEngine
+
+ registerManager.register(metaGroupEngine);
+ registerManager.register(dataGroupEngine);
+
+ // rpc service initialize
+ if (ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
+ MetaAsyncService metaAsyncService = new MetaAsyncService(metaGroupEngine);
+ MetaRaftHeartBeatService.getInstance().initAsyncedServiceImpl(metaAsyncService);
+ MetaRaftService.getInstance().initAsyncedServiceImpl(metaAsyncService);
+ DataRaftService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
+ DataRaftHeartBeatService.getInstance().initAsyncedServiceImpl(dataGroupEngine);
+ } else {
+ MetaSyncService syncService = new MetaSyncService(metaGroupEngine);
+ MetaRaftHeartBeatService.getInstance().initSyncedServiceImpl(syncService);
+ MetaRaftService.getInstance().initSyncedServiceImpl(syncService);
+ DataRaftService.getInstance().initSyncedServiceImpl(dataGroupEngine);
+ DataRaftHeartBeatService.getInstance().initSyncedServiceImpl(dataGroupEngine);
+ }
+ // start RPC service
+ logger.info("start Meta Heartbeat RPC service... ");
+ registerManager.register(MetaRaftHeartBeatService.getInstance());
+ // TODO: better to start the Meta RPC service untill the heartbeatservice has elected the
+ // leader.
+ // and quorum of followers have caught up.
+ logger.info("start Meta RPC service... ");
+ registerManager.register(MetaRaftService.getInstance());
+ }
+
+ private void postInitCluster() throws StartupException, QueryProcessException {
+ logger.info("start Data Heartbeat RPC service... ");
+ registerManager.register(DataRaftHeartBeatService.getInstance());
+ logger.info("start Data RPC service... ");
+ registerManager.register(DataRaftService.getInstance());
+ // RPC based DBA API
+ registerManager.register(ClusterInfoServer.getInstance());
+ // JMX based DBA API
+ registerManager.register(ClusterMonitor.INSTANCE);
+ }
+
+ private void startClientRPC() throws QueryProcessException, StartupException {
+ // we must wait until the metaGroup established.
+ // So that the ClusterRPCService can work.
+ ClusterTSServiceImpl clusterRPCServiceImpl = new ClusterTSServiceImpl();
+ clusterRPCServiceImpl.setCoordinator(coordinator);
+ clusterRPCServiceImpl.setExecutor(metaGroupEngine);
+ ClusterRPCService.getInstance().initSyncedServiceImpl(clusterRPCServiceImpl);
+ registerManager.register(ClusterRPCService.getInstance());
+ }
+
public void activeAddNodeMode() {
- // try {
- // long startTime = System.currentTimeMillis();
- // metaServer = new RaftTSMetaServiceImpl();
- // preStartCustomize();
- // metaServer.start();
- // metaServer.joinCluster();
- // dataEngine.pullSnapshots();
- // // Currently, we do not register ClusterInfoService as a JMX Bean,
- // // so we use startService() rather than start()
- // ClusterInfoServer.getInstance().startService();
- // // JMX based DBA API
- // registerManager.register(ClusterMonitor.INSTANCE);
- // // finally, we start the RPC service
- // registerManager.register(ClusterRPCService.getInstance());
- // logger.info(
- // "Adding this node {} to cluster costs {} ms",
- // metaServer.getMember().getThisNode(),
- // (System.currentTimeMillis() - startTime));
- // } catch (TTransportException
- // | StartupException
- // | QueryProcessException
- // | StartUpCheckFailureException
- // | ConfigInconsistentException e) {
- // stop();
- // logger.error("Fail to join cluster", e);
- // }
+ try {
+ long startTime = System.currentTimeMillis();
+
+ preInitCluster();
+ metaGroupEngine.joinCluster();
+ postInitCluster();
+ dataGroupEngine.pullSnapshots();
+ startClientRPC();
+ logger.info(
+ "Adding this node {} to cluster costs {} ms",
+ thisNode,
+ (System.currentTimeMillis() - startTime));
+ } catch (StartupException
+ | QueryProcessException
+ | StartUpCheckFailureException
+ | ConfigInconsistentException e) {
+ stop();
+ logger.error("Fail to join cluster", e);
+ }
}
private void startServerCheck() throws StartupException {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
index 8b954ec..769150b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/DataClientProvider.java
@@ -21,13 +21,13 @@ package org.apache.iotdb.cluster.client;
import org.apache.iotdb.cluster.client.async.AsyncClientPool;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.FactoryAsync;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient.Factory;
import org.apache.iotdb.cluster.client.sync.SyncClientPool;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
-
+import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -45,16 +45,18 @@ public class DataClientProvider {
public DataClientProvider(TProtocolFactory factory) {
if (!ClusterDescriptor.getInstance().getConfig().isUseAsyncServer()) {
- dataSyncClientPool = new SyncClientPool(new SyncDataClient.FactorySync(factory));
+ dataSyncClientPool = new SyncClientPool(new SyncDataClient.Factory(factory));
} else {
- dataAsyncClientPool = new AsyncClientPool(new FactoryAsync(factory));
+ dataAsyncClientPool = new AsyncClientPool(new Factory(factory));
}
}
+ @TestOnly
AsyncClientPool getDataAsyncClientPool() {
return dataAsyncClientPool;
}
+ @TestOnly
SyncClientPool getDataSyncClientPool() {
return dataSyncClientPool;
}
@@ -66,7 +68,7 @@ public class DataClientProvider {
* @param timeout timeout threshold of connection
*/
public AsyncDataClient getAsyncDataClient(Node node, int timeout) throws IOException {
- AsyncDataClient client = (AsyncDataClient) getDataAsyncClientPool().getClient(node);
+ AsyncDataClient client = (AsyncDataClient) dataAsyncClientPool.getClient(node);
if (client == null) {
throw new IOException("can not get client for node=" + node);
}
@@ -85,7 +87,7 @@ public class DataClientProvider {
* @param timeout timeout threshold of connection
*/
public SyncDataClient getSyncDataClient(Node node, int timeout) throws TException {
- SyncDataClient client = (SyncDataClient) getDataSyncClientPool().getClient(node);
+ SyncDataClient client = (SyncDataClient) dataSyncClientPool.getClient(node);
if (client == null) {
throw new TException("can not get client for node=" + node);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
index c6c8a3a..cad58ba 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataClient.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.AsyncClient;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -98,9 +97,9 @@ public class AsyncDataClient extends AsyncClient {
___currentMethod = null;
}
- public static class FactoryAsync extends AsyncClientFactory {
+ public static class Factory extends AsyncClientFactory {
- public FactoryAsync(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+ public Factory(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
index 6276b64..6bc280d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -53,9 +52,9 @@ public class AsyncDataHeartbeatClient extends AsyncDataClient {
this.pool = pool;
}
- public static class FactoryAsync extends AsyncClientFactory {
+ public static class Factory extends AsyncClientFactory {
- public FactoryAsync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
index d3f8a97..7082ab6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.AsyncClient;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.async.TAsyncMethodCall;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -89,9 +88,9 @@ public class AsyncMetaClient extends AsyncClient {
pool.onError(node);
}
- public static class FactoryAsync extends AsyncClientFactory {
+ public static class Factory extends AsyncClientFactory {
- public FactoryAsync(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+ public Factory(org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
index 40e1075..dcdb44e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.TNonblockingSocketWrapper;
-
import org.apache.thrift.async.TAsyncClientManager;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -53,9 +52,9 @@ public class AsyncMetaHeartbeatClient extends AsyncMetaClient {
this.pool = pool;
}
- public static class FactoryAsync extends AsyncClientFactory {
+ public static class Factory extends AsyncClientFactory {
- public FactoryAsync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index dcda206..487b087 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
import org.apache.iotdb.rpc.TimeoutChangeableTransport;
-
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -94,11 +93,11 @@ public class SyncDataClient extends Client implements Closeable {
putBack();
}
- public static class FactorySync implements SyncClientFactory {
+ public static class Factory implements SyncClientFactory {
private TProtocolFactory protocolFactory;
- public FactorySync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index 7024466..a95c44a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -51,11 +50,11 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
getInputProtocol().getTransport().open();
}
- public static class FactorySync implements SyncClientFactory {
+ public static class Factory implements SyncClientFactory {
private TProtocolFactory protocolFactory;
- public FactorySync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 28674b8..7b77837 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -77,11 +76,11 @@ public class SyncMetaClient extends Client implements Closeable {
putBack();
}
- public static class FactorySync implements SyncClientFactory {
+ public static class Factory implements SyncClientFactory {
private TProtocolFactory protocolFactory;
- public FactorySync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index ad19287..451897b 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.TConfigurationConst;
-
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -51,11 +50,11 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
getInputProtocol().getTransport().open();
}
- public static class FactorySync implements SyncClientFactory {
+ public static class Factory implements SyncClientFactory {
private TProtocolFactory protocolFactory;
- public FactorySync(TProtocolFactory protocolFactory) {
+ public Factory(TProtocolFactory protocolFactory) {
this.protocolFactory = protocolFactory;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 287f24a..d838944 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -95,7 +95,6 @@ import org.apache.iotdb.db.utils.TestOnly;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.utils.Pair;
-
import org.apache.thrift.protocol.TProtocolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -193,10 +192,10 @@ public class DataGroupMember extends RaftMember implements DataGroupMemberMBean
+ "-raftId-"
+ nodes.getId()
+ "",
- new AsyncClientPool(new AsyncDataClient.FactoryAsync(factory)),
- new SyncClientPool(new SyncDataClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncDataHeartbeatClient.FactoryAsync(factory)),
- new SyncClientPool(new SyncDataHeartbeatClient.FactorySync(factory)),
+ new AsyncClientPool(new AsyncDataClient.Factory(factory)),
+ new SyncClientPool(new SyncDataClient.Factory(factory)),
+ new AsyncClientPool(new AsyncDataHeartbeatClient.Factory(factory)),
+ new SyncClientPool(new SyncDataHeartbeatClient.Factory(factory)),
new AsyncClientPool(new SingleManagerFactory(factory)));
this.metaGroupMember = metaGroupMember;
allNodes = nodes;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
index 3029a18..59012a1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/MetaGroupMember.java
@@ -68,7 +68,6 @@ import org.apache.iotdb.cluster.server.Response;
import org.apache.iotdb.cluster.server.handlers.caller.GenericHandler;
import org.apache.iotdb.cluster.server.handlers.caller.NodeStatusHandler;
import org.apache.iotdb.cluster.server.heartbeat.MetaHeartbeatThread;
-import org.apache.iotdb.cluster.server.member.DataGroupMember.Factory;
import org.apache.iotdb.cluster.server.monitor.NodeReport.MetaMemberReport;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.cluster.server.monitor.Timer;
@@ -96,7 +95,6 @@ import org.apache.iotdb.db.utils.TimeValuePairUtils.Intervals;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
-
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TTransportException;
@@ -234,10 +232,10 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
public MetaGroupMember(TProtocolFactory factory, Node thisNode, Coordinator coordinator) {
super(
"Meta",
- new AsyncClientPool(new AsyncMetaClient.FactoryAsync(factory)),
- new SyncClientPool(new SyncMetaClient.FactorySync(factory)),
- new AsyncClientPool(new AsyncMetaHeartbeatClient.FactoryAsync(factory)),
- new SyncClientPool(new SyncMetaHeartbeatClient.FactorySync(factory)));
+ new AsyncClientPool(new AsyncMetaClient.Factory(factory)),
+ new SyncClientPool(new SyncMetaClient.Factory(factory)),
+ new AsyncClientPool(new AsyncMetaHeartbeatClient.Factory(factory)),
+ new SyncClientPool(new SyncMetaHeartbeatClient.Factory(factory)));
allNodes = new PartitionGroup();
initPeerMap();
@@ -252,7 +250,7 @@ public class MetaGroupMember extends RaftMember implements IService, MetaGroupMe
loadIdentifier();
allNodes.add(thisNode);
- Factory dataMemberFactory = new Factory(factory, this);
+ DataGroupMember.Factory dataMemberFactory = new DataGroupMember.Factory(factory, this);
startUpStatus = getNewStartUpStatus();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
index 3d1d335..99fe589 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncClientPoolTest.java
@@ -4,7 +4,7 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncDataClient.FactoryAsync;
+import org.apache.iotdb.cluster.client.async.AsyncDataClient.Factory;
import org.apache.iotdb.cluster.common.TestAsyncClient;
import org.apache.iotdb.cluster.common.TestAsyncClientFactory;
import org.apache.iotdb.cluster.common.TestUtils;
@@ -12,9 +12,7 @@ import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -58,14 +56,14 @@ public class AsyncClientPoolTest {
@Test
public void testDataClient() throws IOException {
- testAsyncClientFactory = new FactoryAsync(new TBinaryProtocol.Factory());
+ testAsyncClientFactory = new Factory(new TBinaryProtocol.Factory());
getClient();
putClient();
}
@Test
public void testMetaClient() throws IOException {
- testAsyncClientFactory = new AsyncMetaClient.FactoryAsync(new TBinaryProtocol.Factory());
+ testAsyncClientFactory = new AsyncMetaClient.Factory(new TBinaryProtocol.Factory());
getClient();
putClient();
}
@@ -92,11 +90,11 @@ public class AsyncClientPoolTest {
for (int i = 0; i < 10; i++) {
asyncClientPool.putClient(TestUtils.getNode(i), testClients.get(i));
}
- } else if (testAsyncClientFactory instanceof AsyncMetaClient.FactoryAsync) {
+ } else if (testAsyncClientFactory instanceof AsyncMetaClient.Factory) {
for (AsyncClient testClient : testClients) {
((AsyncMetaClient) testClient).onComplete();
}
- } else if (testAsyncClientFactory instanceof FactoryAsync) {
+ } else if (testAsyncClientFactory instanceof Factory) {
for (AsyncClient testClient : testClients) {
((AsyncDataClient) testClient).onComplete();
}
@@ -198,7 +196,7 @@ public class AsyncClientPoolTest {
public void testRecreateClient() throws IOException {
testAsyncClientFactory = new TestAsyncClientFactory();
AsyncClientPool asyncClientPool =
- new AsyncClientPool(new AsyncMetaClient.FactoryAsync(new Factory()));
+ new AsyncClientPool(new AsyncMetaClient.Factory(new TBinaryProtocol.Factory()));
AsyncMetaClient client = (AsyncMetaClient) asyncClientPool.getClient(TestUtils.getNode(0));
client.onError(new Exception());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
index 392c69e..76ab0f7 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncDataHeartbeatClientTest.java
@@ -19,13 +19,11 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncDataHeartbeatClient.FactoryAsync;
+import junit.framework.TestCase;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
-import junit.framework.TestCase;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
@@ -51,7 +49,8 @@ public class AsyncDataHeartbeatClientTest extends TestCase {
@Test
public void test() throws IOException {
- FactoryAsync factoryAsync = new FactoryAsync(new Factory());
+ AsyncDataHeartbeatClient.Factory factoryAsync =
+ new AsyncDataHeartbeatClient.Factory(new Factory());
AsyncClient asyncClient = factoryAsync.getAsyncClient(TestUtils.getNode(0), null);
assertEquals(
"AsyncDataHeartbeatClient{node=Node(internalIp:192.168.0.0, metaPort:9003, nodeIdentifier:0, dataPort:40010, clientPort:6667, clientIp:0.0.0.0),dataHeartbeatPort=40011}",
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
index f424ec9..c1b73fc 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaClientTest.java
@@ -4,17 +4,15 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncMetaClient.FactoryAsync;
+import org.apache.iotdb.cluster.client.async.AsyncMetaClient.Factory;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterConstant;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.apache.thrift.async.TAsyncClientManager;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.transport.TNonblockingSocket;
import org.junit.After;
import org.junit.Before;
@@ -22,7 +20,10 @@ import org.junit.Test;
import java.io.IOException;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class AsyncMetaClientTest {
@@ -42,12 +43,13 @@ public class AsyncMetaClientTest {
@Test
public void test() throws IOException, TException {
- AsyncClientPool asyncClientPool = new AsyncClientPool(new FactoryAsync(new Factory()));
+ AsyncClientPool asyncClientPool =
+ new AsyncClientPool(new Factory(new org.apache.thrift.protocol.TBinaryProtocol.Factory()));
AsyncMetaClient client;
Node node = TestUtils.getNode(0);
client =
new AsyncMetaClient(
- new Factory(),
+ new org.apache.thrift.protocol.TBinaryProtocol.Factory(),
new TAsyncClientManager(),
new TNonblockingSocket(
node.getInternalIp(),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
index 345b9fe..41ed581 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/async/AsyncMetaHeartbeatClientTest.java
@@ -19,13 +19,11 @@
package org.apache.iotdb.cluster.client.async;
-import org.apache.iotdb.cluster.client.async.AsyncMetaHeartbeatClient.FactoryAsync;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
-
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TBinaryProtocol;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -51,7 +49,8 @@ public class AsyncMetaHeartbeatClientTest {
@Test
public void test() throws IOException {
- FactoryAsync factoryAsync = new FactoryAsync(new Factory());
+ AsyncMetaHeartbeatClient.Factory factoryAsync =
+ new AsyncMetaHeartbeatClient.Factory(new TBinaryProtocol.Factory());
AsyncClient asyncClient = factoryAsync.getAsyncClient(TestUtils.getNode(0), null);
Assert.assertEquals(
"AsyncMetaHeartbeatClient{node=Node(internalIp:192.168.0.0, metaPort:9003, "
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
index 0356b19..5f0334e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataClientTest.java
@@ -4,13 +4,11 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncDataClient.FactorySync;
+import org.apache.iotdb.cluster.client.sync.SyncDataClient.Factory;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.rpc.TSocketWrapper;
-
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.Test;
import java.io.IOException;
@@ -41,7 +39,8 @@ public class SyncDataClientTest {
listenThread.start();
try {
- SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new Factory()));
+ SyncClientPool syncClientPool =
+ new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncDataClient client;
client = (SyncDataClient) syncClientPool.getClient(node);
@@ -92,7 +91,8 @@ public class SyncDataClientTest {
listenThread.start();
try {
- SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new Factory()));
+ SyncClientPool syncClientPool =
+ new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncDataClient clientOut;
try (SyncDataClient clientIn = (SyncDataClient) syncClientPool.getClient(node)) {
assertEquals(node, clientIn.getNode());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
index 29162e0..a03e6b5 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClientTest.java
@@ -19,9 +19,7 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncDataHeartbeatClient.FactorySync;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
@@ -51,7 +49,8 @@ public class SyncDataHeartbeatClientTest {
listenThread.start();
try {
- FactorySync factoryAsync = new FactorySync(new Factory());
+ SyncDataHeartbeatClient.Factory factoryAsync =
+ new SyncDataHeartbeatClient.Factory(new Factory());
SyncDataHeartbeatClient syncClient = factoryAsync.getSyncClient(node, null);
Assert.assertEquals(
"SyncHeartbeatDataClient{node=Node(internalIp:localhost, metaPort:0, nodeIdentifier:0,"
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
index 9a7a8ce..d0d994f 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaClientTest.java
@@ -4,13 +4,11 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncMetaClient.FactorySync;
+import org.apache.iotdb.cluster.client.sync.SyncMetaClient.Factory;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.Client;
import org.apache.iotdb.rpc.TSocketWrapper;
-
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.Test;
import java.io.IOException;
@@ -41,7 +39,8 @@ public class SyncMetaClientTest {
listenThread.start();
try {
- SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new Factory()));
+ SyncClientPool syncClientPool =
+ new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncMetaClient client;
client = (SyncMetaClient) syncClientPool.getClient(node);
@@ -84,7 +83,8 @@ public class SyncMetaClientTest {
listenThread.start();
try {
- SyncClientPool syncClientPool = new SyncClientPool(new FactorySync(new Factory()));
+ SyncClientPool syncClientPool =
+ new SyncClientPool(new Factory(new TBinaryProtocol.Factory()));
SyncMetaClient clientOut;
try (SyncMetaClient clientIn = (SyncMetaClient) syncClientPool.getClient(node); ) {
assertEquals(node, clientIn.getNode());
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
index 460ed82..146d182 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClientTest.java
@@ -19,10 +19,8 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.sync.SyncMetaHeartbeatClient.FactorySync;
import org.apache.iotdb.cluster.rpc.thrift.Node;
-
-import org.apache.thrift.protocol.TBinaryProtocol.Factory;
+import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TTransportException;
import org.junit.Assert;
import org.junit.Test;
@@ -51,7 +49,8 @@ public class SyncMetaHeartbeatClientTest {
listenThread.start();
try {
- FactorySync factoryAsync = new FactorySync(new Factory());
+ SyncMetaHeartbeatClient.Factory factoryAsync =
+ new SyncMetaHeartbeatClient.Factory(new TBinaryProtocol.Factory());
SyncMetaHeartbeatClient syncClient = factoryAsync.getSyncClient(node, null);
Assert.assertEquals(
"SyncMetaHeartbeatClient{node=Node(internalIp:localhost, metaPort:9003,"
diff --git a/server/src/assembly/resources/conf/iotdb-engine.properties b/server/src/assembly/resources/conf/iotdb-engine.properties
index 14c1330..faee6c2 100644
--- a/server/src/assembly/resources/conf/iotdb-engine.properties
+++ b/server/src/assembly/resources/conf/iotdb-engine.properties
@@ -45,6 +45,7 @@ rpc_port=6667
# if true, a snappy based compression method will be called before sending data by the network
# Datatype: boolean
+# this feature is under development, set this as false before it is done.
# rpc_advanced_compression_enable=false
# Datatype: int
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 599bc78..63c057e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -18,17 +18,17 @@
*/
package org.apache.iotdb.db.conf;
+import com.google.common.net.InetAddresses;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.utils.FilePathUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.fileSystem.FSType;
-
-import com.google.common.net.InetAddresses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -813,6 +813,9 @@ public class IoTDBDescriptor {
// set tsfile-format config
loadTsFileProps(properties);
+ // make RPCTransportFactory taking effect.
+ RpcTransportFactory.reInit();
+
// UDF
loadUDFProps(properties);
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index 1a70334..c2c4fb2 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -28,25 +28,18 @@ public class RpcTransportFactory extends TTransportFactory {
// TODO: make it a config
public static boolean USE_SNAPPY = false;
- public static final RpcTransportFactory INSTANCE;
+ public static RpcTransportFactory INSTANCE;
private static int thriftDefaultBufferSize = RpcUtils.THRIFT_DEFAULT_BUF_CAPACITY;
private static int thriftMaxFrameSize = RpcUtils.THRIFT_FRAME_MAX_SIZE;
static {
- INSTANCE =
- USE_SNAPPY
- ? new RpcTransportFactory(
- new TimeoutChangeableTSnappyFramedTransport.Factory(
- thriftDefaultBufferSize, thriftMaxFrameSize))
- : new RpcTransportFactory(
- new TimeoutChangeableTFastFramedTransport.Factory(
- thriftDefaultBufferSize, thriftMaxFrameSize));
+ reInit();
}
private TTransportFactory inner;
- public RpcTransportFactory(TTransportFactory inner) {
+ private RpcTransportFactory(TTransportFactory inner) {
this.inner = inner;
}
@@ -70,4 +63,15 @@ public class RpcTransportFactory extends TTransportFactory {
public static void setThriftMaxFrameSize(int thriftMaxFrameSize) {
RpcTransportFactory.thriftMaxFrameSize = thriftMaxFrameSize;
}
+
+ public static void reInit() {
+ INSTANCE =
+ USE_SNAPPY
+ ? new RpcTransportFactory(
+ new TimeoutChangeableTSnappyFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize))
+ : new RpcTransportFactory(
+ new TimeoutChangeableTFastFramedTransport.Factory(
+ thriftDefaultBufferSize, thriftMaxFrameSize));
+ }
}