You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/01 06:53:00 UTC
[iotdb] branch cluster_add_snappy updated: refactor transport
factory
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
new 0a42b77 refactor transport factory
0a42b77 is described below
commit 0a42b7716cf4dc99eb246505db11aee9e5fc4208
Author: jt <jt...@163.com>
AuthorDate: Tue Dec 1 14:51:31 2020 +0800
refactor transport factory
---
cluster/src/assembly/resources/conf/logback.xml | 9 ----
.../java/org/apache/iotdb/cluster/ClientMain.java | 4 +-
.../iotdb/cluster/client/sync/SyncDataClient.java | 9 ++--
.../client/sync/SyncDataHeartbeatClient.java | 4 +-
.../iotdb/cluster/client/sync/SyncMetaClient.java | 4 +-
.../client/sync/SyncMetaHeartbeatClient.java | 4 +-
.../apache/iotdb/cluster/server/ClientServer.java | 6 +--
.../apache/iotdb/cluster/server/RaftServer.java | 6 +--
.../cluster/server/heartbeat/HeartbeatServer.java | 7 +--
.../iotdb/cluster/server/member/RaftMember.java | 2 +-
.../apache/iotdb/cluster/utils/ClusterUtils.java | 7 +--
.../TimeoutChangeableTFastFramedTransportTest.java | 24 ----------
.../org/apache/iotdb/jdbc/IoTDBConnection.java | 16 +++++--
.../db/service/thrift/ThriftServiceThread.java | 4 +-
.../iotdb/db/sync/sender/transfer/SyncClient.java | 5 +-
.../org/apache/iotdb/rpc/AutoExpandingBuffer.java | 53 ++++++++++++++++++++++
.../iotdb/rpc}/AutoScalingBufferReadTransport.java | 21 +++------
.../rpc}/AutoScalingBufferWriteTransport.java | 26 ++++-------
.../org/apache/iotdb/rpc/RpcTransportFactory.java | 34 ++++++++------
.../apache/iotdb/rpc}/TElasticFramedTransport.java | 10 ++--
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 18 ++++----
.../TimeoutChangeableTFastFramedTransport.java | 16 +++++--
.../TimeoutChangeableTSnappyFramedTransport.java | 17 +++++--
.../iotdb/rpc/TimeoutChangeableTransport.java | 20 +++-----
.../apache/iotdb/session/SessionConnection.java | 4 +-
25 files changed, 173 insertions(+), 157 deletions(-)
diff --git a/cluster/src/assembly/resources/conf/logback.xml b/cluster/src/assembly/resources/conf/logback.xml
index 4b641f5..e81e6e5 100644
--- a/cluster/src/assembly/resources/conf/logback.xml
+++ b/cluster/src/assembly/resources/conf/logback.xml
@@ -27,7 +27,6 @@
<onMismatch>DENY</onMismatch>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_WARN">
<file>${IOTDB_HOME}/logs/log_warn.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -49,7 +48,6 @@
<onMismatch>DENY</onMismatch>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_INFO">
<file>${IOTDB_HOME}/logs/log_info.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -71,7 +69,6 @@
<onMismatch>DENY</onMismatch>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_DEBUG">
<file>${IOTDB_HOME}/logs/log_debug.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -93,7 +90,6 @@
<onMismatch>DENY</onMismatch>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_TRACE">
<file>${IOTDB_HOME}/logs/log_trace.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -115,7 +111,6 @@
<onMismatch>DENY</onMismatch>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.ConsoleAppender" name="FILE_STDOUT">
<Target>System.out</Target>
<encoder>
@@ -126,7 +121,6 @@
<level>INFO</level>
</filter>
</appender>
-
<logger level="info" name="org.apache.iotdb.db.service"/>
<logger level="info" name="org.apache.iotdb.db.conf"/>
<!-- a log appender that collect all log records whose level is greather than debug-->
@@ -149,7 +143,6 @@
<level>DEBUG</level>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_COST_MEASURE">
<file>${IOTDB_HOME}/logs/log_measure.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -169,7 +162,6 @@
<level>INFO</level>
</filter>
</appender>
-
<appender class="ch.qos.logback.core.rolling.RollingFileAppender" name="FILE_DYNAMIC_PARAMETER">
<file>${IOTDB_HOME}/logs/log_dynamic_adapter.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
@@ -189,7 +181,6 @@
<level>DEBUG</level>
</filter>
</appender>
-
<appender name="ASYNC_FILE_ERROR" class="ch.qos.logback.classic.AsyncAppender">
<queueSize>1024</queueSize>
<neverBlock>true</neverBlock>
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
index c339344..5da4e52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/ClientMain.java
@@ -36,11 +36,11 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.jdbc.Config;
import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
@@ -280,7 +280,7 @@ public class ClientMain {
@SuppressWarnings({"java:S2095"}) // the transport is used later
private static Client getClient(String ip, int port) throws TTransportException {
TSIService.Client.Factory factory = new Factory();
- TTransport transport = new TElasticFramedTransport(new TSocket(ip, port));
+ TTransport transport = RpcTransportFactory.INSTANCE.getTransport(new TSocket(ip, port));
transport.open();
TProtocol protocol =
IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable() ?
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
index 9dd4f48..23e084c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataClient.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.cluster.client.sync;
import java.net.SocketException;
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSDataService.Client;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TimeoutChangeableTransport;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -48,7 +49,7 @@ public class SyncDataClient extends Client {
public SyncDataClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
- super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+ super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
new TSocket(node.getIp(), node.getDataPort(), RaftServer.getConnectionTimeoutInMS()))));
this.node = node;
this.pool = pool;
@@ -57,13 +58,13 @@ public class SyncDataClient extends Client {
public void setTimeout(int timeout) {
// the same transport is used in both input and output
- ((TimeoutChangeableTFastFramedTransport) (getInputProtocol().getTransport()))
+ ((TimeoutChangeableTransport) (getInputProtocol().getTransport()))
.setTimeout(timeout);
}
@TestOnly
public int getTimeout() throws SocketException {
- return ((TimeoutChangeableTFastFramedTransport) getInputProtocol().getTransport()).getTimeOut();
+ return ((TimeoutChangeableTransport) getInputProtocol().getTransport()).getTimeOut();
}
public void putBack() {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
index d9b40ad..134ec11 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncDataHeartbeatClient.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -36,7 +36,7 @@ public class SyncDataHeartbeatClient extends SyncDataClient {
private SyncDataHeartbeatClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
- super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+ super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
new TSocket(node.getIp(), node.getDataPort() + ClusterUtils.DATA_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()))));
this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
index 7cec0f0..6e4d450 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaClient.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.TSMetaService.Client;
import org.apache.iotdb.cluster.server.RaftServer;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
@@ -45,7 +45,7 @@ public class SyncMetaClient extends Client {
public SyncMetaClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
- super(protocolFactory.getProtocol(new TElasticFramedTransport(
+ super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
new TSocket(node.getIp(), node.getMetaPort(), RaftServer.getConnectionTimeoutInMS()))));
this.node = node;
this.pool = pool;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
index 4326cf9..dddc66f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/sync/SyncMetaHeartbeatClient.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.cluster.client.sync;
-import org.apache.iotdb.cluster.client.rpcutils.TimeoutChangeableTFastFramedTransport;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.RaftServer;
import org.apache.iotdb.cluster.utils.ClusterUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
@@ -36,7 +36,7 @@ public class SyncMetaHeartbeatClient extends SyncMetaClient {
private SyncMetaHeartbeatClient(TProtocolFactory protocolFactory, Node node, SyncClientPool pool)
throws TTransportException {
// the difference of the two clients lies in the port
- super(protocolFactory.getProtocol(new TimeoutChangeableTFastFramedTransport(
+ super(protocolFactory.getProtocol(RpcTransportFactory.INSTANCE.getTransport(
new TSocket(node.getIp(), node.getMetaPort() + ClusterUtils.META_HEARTBEAT_PORT_OFFSET,
RaftServer.getConnectionTimeoutInMS()))));
this.node = node;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
index 4c82c5b..f0c554e 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/ClientServer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iotdb.cluster.client.async.AsyncDataClient;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.cluster.client.sync.SyncDataClient;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
@@ -55,6 +54,7 @@ import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSIService.Processor;
@@ -164,9 +164,7 @@ public class ClientServer extends TSServiceImpl {
poolArgs.processor(new Processor<>(this));
poolArgs.protocolFactory(protocolFactory);
// nonblocking server requests FramedTransport
- poolArgs.transportFactory(new TElasticFramedTransport.Factory(
- IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
- IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
poolServer = new TThreadPoolServer(poolArgs);
// mainly for handling client exit events
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
index 0f2363e..fe0cc63 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/RaftServer.java
@@ -25,7 +25,6 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
@@ -35,6 +34,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -202,9 +202,7 @@ public abstract class RaftServer implements RaftService.AsyncIface, RaftService.
poolArgs.processor(getProcessor());
poolArgs.protocolFactory(protocolFactory);
// async service requires FramedTransport
- poolArgs.transportFactory(new TElasticFramedTransport.Factory(
- IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
- IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
// run the thrift server in a separate thread so that the main thread is not blocked
return new TThreadedSelectorServer(poolArgs);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
index f3d494f..e236e1c 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/heartbeat/HeartbeatServer.java
@@ -25,14 +25,13 @@ import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.utils.ClusterUtils;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -190,9 +189,7 @@ public abstract class HeartbeatServer {
poolArgs.processor(getProcessor());
poolArgs.protocolFactory(heartbeatProtocolFactory);
// async service requires FramedTransport
- poolArgs.transportFactory(new TElasticFramedTransport.Factory(
- IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
- IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
return new THsHaServer(poolArgs);
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 555c0a3..245b135 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -107,7 +107,7 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("java:S3077") // reference volatile is enough
public abstract class RaftMember {
- public static final boolean USE_LOG_DISPATCHER = true;
+ public static final boolean USE_LOG_DISPATCHER = false;
private static final String MSG_FORWARD_TIMEOUT = "{}: Forward {} to {} time out";
private static final String MSG_FORWARD_ERROR = "{}: encountered an error when forwarding {} to"
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
index 8346ddc..395ce03 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterUtils.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.iotdb.cluster.client.rpcutils.TElasticFramedTransport;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.exception.CheckConsistencyException;
@@ -37,11 +36,11 @@ import org.apache.iotdb.cluster.rpc.thrift.CheckStatusResponse;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.StartUpStatus;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
@@ -253,9 +252,7 @@ public class ClusterUtils {
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
// async service requires FramedTransport
- poolArgs.transportFactory(new TElasticFramedTransport.Factory(
- IoTDBDescriptor.getInstance().getConfig().getThriftInitBufferSize(),
- IoTDBDescriptor.getInstance().getConfig().getThriftMaxFrameSize()));
+ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
// run the thrift server in a separate thread so that the main thread is not blocked
return new TThreadPoolServer(poolArgs);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java
deleted file mode 100644
index 3b00a88..0000000
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransportTest.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless [...]
- */
-
-package org.apache.iotdb.cluster.client.rpcutils;
-
-import static org.junit.Assert.*;
-
-import java.net.SocketException;
-import org.apache.thrift.transport.TSocket;
-import org.junit.Test;
-
-public class TimeoutChangeableTFastFramedTransportTest {
-
- @Test
- public void test() throws SocketException {
- TimeoutChangeableTFastFramedTransport transport;
- transport = new TimeoutChangeableTFastFramedTransport(new TSocket("localhost", 6667));
-
- assertEquals(0, transport.getTimeOut());
- transport.setTimeout(1000);
- assertEquals(1000, transport.getTimeOut());
- }
-}
\ No newline at end of file
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
index 85464cd..1f954e3 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java
@@ -38,13 +38,20 @@ import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.service.rpc.thrift.*;
+import org.apache.iotdb.service.rpc.thrift.ServerProperties;
+import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq;
+import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp;
+import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
+import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -407,8 +414,9 @@ public class IoTDBConnection implements Connection {
}
private void openTransport() throws TTransportException {
- transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(),
- Config.connectionTimeoutInMs));
+ transport = RpcTransportFactory.INSTANCE
+ .getTransport(new TSocket(params.getHost(), params.getPort(),
+ Config.connectionTimeoutInMs));
if (!transport.isOpen()) {
transport.open();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
index 7f650d3..51fabac 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/ThriftServiceThread.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.exception.runtime.RPCServiceException;
import org.apache.iotdb.db.utils.CommonUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -32,7 +33,6 @@ import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
-import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;
@@ -73,7 +73,7 @@ public class ThriftServiceThread extends Thread {
threadsName);
poolArgs.processor(processor);
poolArgs.protocolFactory(protocolFactory);
- poolArgs.transportFactory(new TFastFramedTransport.Factory());
+ poolArgs.transportFactory(RpcTransportFactory.INSTANCE);
poolServer = new TThreadPoolServer(poolArgs);
poolServer.setServerEventHandler(serverEventHandler);
} catch (TTransportException e) {
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
index a0e16ff..8da4491 100644
--- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
+++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java
@@ -68,6 +68,7 @@ import org.apache.iotdb.db.sync.sender.recover.ISyncSenderLogger;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogAnalyzer;
import org.apache.iotdb.db.sync.sender.recover.SyncSenderLogger;
import org.apache.iotdb.db.utils.SyncUtils;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.service.sync.thrift.ConfirmInfo;
import org.apache.iotdb.service.sync.thrift.SyncService;
import org.apache.iotdb.service.sync.thrift.SyncStatus;
@@ -75,7 +76,6 @@ import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -273,7 +273,8 @@ public class SyncClient implements ISyncClient {
@Override
public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException {
- transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
+ transport = RpcTransportFactory.INSTANCE
+ .getTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS));
TProtocol protocol = new TBinaryProtocol(transport);
serviceClient = new SyncService.Client(protocol);
try {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
new file mode 100644
index 0000000..52490eb
--- /dev/null
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.rpc;
+
+import java.util.Arrays;
+
+/**
+ * Helper class that wraps a byte[] so that it can expand and be reused. Users
+ * should call resizeIfNecessary to make sure the buffer has suitable capacity,
+ * and then use the array as needed. Note that the internal array will grow at a
+ * rate slightly faster than the requested capacity with the (untested)
+ * objective of avoiding expensive buffer allocations and copies.
+ */
+class AutoExpandingBuffer {
+ private byte[] array;
+
+ public AutoExpandingBuffer(int initialCapacity) {
+ this.array = new byte[initialCapacity];
+ }
+
+ public void resizeIfNecessary(int size) {
+ final int currentCapacity = this.array.length;
+ final double loadFactor = 0.3;
+ if (currentCapacity < size) {
+ // Increase by a factor of 1.5x
+ int growCapacity = currentCapacity + (currentCapacity >> 1);
+ int newCapacity = Math.max(growCapacity, size);
+ this.array = Arrays.copyOf(array, newCapacity);
+ } else if (array.length * loadFactor > size) {
+ array = Arrays.copyOf(array, size);
+ }
+ }
+
+ public byte[] array() {
+ return this.array;
+ }
+}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
similarity index 64%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
index 7a564e9..085e9e8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferReadTransport.java
@@ -17,29 +17,20 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
import org.apache.thrift.transport.AutoExpandingBufferReadTransport;
public class AutoScalingBufferReadTransport extends AutoExpandingBufferReadTransport {
- private final AutoScalingBuffer buf;
+ private final AutoExpandingBuffer buf;
- public AutoScalingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) {
- super(initialCapacity, overgrowthCoefficient);
- this.buf = new AutoScalingBuffer(initialCapacity, overgrowthCoefficient);
+ public AutoScalingBufferReadTransport(int initialCapacity) {
+ super(initialCapacity);
+ this.buf = new AutoExpandingBuffer(initialCapacity);
}
- /**
- * shrink the buffer to the specific size
- *
- * @param size The size of the target you want to shrink to
- */
- public void shrinkSizeIfNecessary(int size) {
- buf.shrinkSizeIfNecessary(size);
- }
-
- public void expandIfNecessary(int size) {
+ public void resizeIfNecessary(int size) {
buf.resizeIfNecessary(size);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
similarity index 78%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
index f7024e6..26807fb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferWriteTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoScalingBufferWriteTransport.java
@@ -17,9 +17,8 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
-import org.apache.thrift.transport.AutoExpandingBuffer;
import org.apache.thrift.transport.TTransport;
/**
@@ -28,11 +27,11 @@ import org.apache.thrift.transport.TTransport;
*/
public class AutoScalingBufferWriteTransport extends TTransport {
- private final AutoScalingBuffer buf;
+ private final AutoExpandingBuffer buf;
private int pos;
- public AutoScalingBufferWriteTransport(int initialCapacity, double growthCoefficient) {
- this.buf = new AutoScalingBuffer(initialCapacity, growthCoefficient);
+ public AutoScalingBufferWriteTransport(int initialCapacity) {
+ this.buf = new AutoExpandingBuffer(initialCapacity);
this.pos = 0;
}
@@ -63,10 +62,6 @@ public class AutoScalingBufferWriteTransport extends TTransport {
pos += len;
}
- public AutoExpandingBuffer getBuf() {
- return buf;
- }
-
public int getPos() {
return pos;
}
@@ -75,12 +70,11 @@ public class AutoScalingBufferWriteTransport extends TTransport {
pos = 0;
}
- /**
- * shrink the buffer to the specific size
- *
- * @param size The size of the target you want to shrink to
- */
- public void shrinkSizeIfNecessary(int size) {
- buf.shrinkSizeIfNecessary(size);
+ public void resizeIfNecessary(int size) {
+ buf.resizeIfNecessary(size);
+ }
+
+ public AutoExpandingBuffer getBuf() {
+ return buf;
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
similarity index 51%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
index f4d9a95..6a30b7a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcTransportFactory.java
@@ -17,27 +17,31 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
-import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
-import org.apache.thrift.transport.TSocket;
+import org.apache.iotdb.rpc.TimeoutChangeableTFastFramedTransport.Factory;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class RpcTransportFactory extends TTransportFactory {
- private TSocket underlying;
-
- public TimeoutChangeableTFastFramedTransport(TSocket underlying) {
- super(underlying);
- this.underlying = underlying;
+ // TODO: make it a config
+ public static final boolean USE_SNAPPY = true;
+ public static final RpcTransportFactory INSTANCE;
+ static {
+ INSTANCE = USE_SNAPPY ?
+ new RpcTransportFactory(new TimeoutChangeableTSnappyFramedTransport.Factory()) :
+ new RpcTransportFactory(new Factory());
}
- public void setTimeout(int timeout) {
- underlying.setTimeout(timeout);
+ private TTransportFactory inner;
+
+ public RpcTransportFactory(TTransportFactory inner) {
+ this.inner = inner;
}
- @TestOnly
- public int getTimeOut() throws SocketException {
- return underlying.getSocket().getSoTimeout();
+ @Override
+ public TTransport getTransport(TTransport trans) {
+ return inner.getTransport(trans);
}
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
similarity index 96%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 7ad4a8c..3fddcf1 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -65,8 +65,8 @@ public class TElasticFramedTransport extends TFastFramedTransport {
super(underlying, initialBufferCapacity, maxLength);
this.underlying = underlying;
this.maxLength = maxLength;
- readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity, 1.5);
- writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity, 1.5);
+ readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
+ writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
@@ -109,7 +109,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
readCompressBuffer.getByteBuffer().position(0);
if (uncompressedLength < maxLength) {
- readBuffer.shrinkSizeIfNecessary(maxLength);
+ readBuffer.resizeIfNecessary(maxLength);
}
readBuffer.fill(readCompressBuffer, uncompressedLength);
} catch (IOException e) {
@@ -147,7 +147,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
}
writeBuffer.reset();
- writeBuffer.shrinkSizeIfNecessary(maxLength);
+ writeBuffer.resizeIfNecessary(maxLength);
underlying.flush();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
similarity index 91%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 7ad4a8c..307b363 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -28,7 +28,7 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.xerial.snappy.Snappy;
-public class TElasticFramedTransport extends TFastFramedTransport {
+public class TSnappyElasticFramedTransport extends TFastFramedTransport {
private TByteBuffer writeCompressBuffer;
private TByteBuffer readCompressBuffer;
@@ -53,20 +53,20 @@ public class TElasticFramedTransport extends TFastFramedTransport {
@Override
public TTransport getTransport(TTransport trans) {
- return new TElasticFramedTransport(trans, initialCapacity, maxLength);
+ return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
}
}
- public TElasticFramedTransport(TTransport underlying) {
+ public TSnappyElasticFramedTransport(TTransport underlying) {
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}
- public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+ public TSnappyElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
super(underlying, initialBufferCapacity, maxLength);
this.underlying = underlying;
this.maxLength = maxLength;
- readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity, 1.5);
- writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity, 1.5);
+ readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
+ writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
@@ -109,7 +109,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
readCompressBuffer.getByteBuffer().position(0);
if (uncompressedLength < maxLength) {
- readBuffer.shrinkSizeIfNecessary(maxLength);
+ readBuffer.resizeIfNecessary(maxLength);
}
readBuffer.fill(readCompressBuffer, uncompressedLength);
} catch (IOException e) {
@@ -147,7 +147,7 @@ public class TElasticFramedTransport extends TFastFramedTransport {
}
writeBuffer.reset();
- writeBuffer.shrinkSizeIfNecessary(maxLength);
+ writeBuffer.resizeIfNecessary(maxLength);
underlying.flush();
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
similarity index 76%
copy from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
copy to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
index f4d9a95..4aa50a7 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTFastFramedTransport.java
@@ -17,13 +17,14 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport implements TimeoutChangeableTransport {
private TSocket underlying;
@@ -36,8 +37,15 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
underlying.setTimeout(timeout);
}
- @TestOnly
public int getTimeOut() throws SocketException {
return underlying.getSocket().getSoTimeout();
}
+
+ public static class Factory extends TTransportFactory {
+
+ @Override
+ public TTransport getTransport(TTransport trans) {
+ return new TimeoutChangeableTSnappyFramedTransport((TSocket) trans);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
similarity index 68%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
index f4d9a95..b251f3f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TimeoutChangeableTFastFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTSnappyFramedTransport.java
@@ -17,17 +17,18 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
+package org.apache.iotdb.rpc;
import java.net.SocketException;
-import org.apache.iotdb.db.utils.TestOnly;
import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportFactory;
-public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTransport {
+public class TimeoutChangeableTSnappyFramedTransport extends TSnappyElasticFramedTransport implements TimeoutChangeableTransport {
private TSocket underlying;
- public TimeoutChangeableTFastFramedTransport(TSocket underlying) {
+ public TimeoutChangeableTSnappyFramedTransport(TSocket underlying) {
super(underlying);
this.underlying = underlying;
}
@@ -36,8 +37,14 @@ public class TimeoutChangeableTFastFramedTransport extends TElasticFramedTranspo
underlying.setTimeout(timeout);
}
- @TestOnly
public int getTimeOut() throws SocketException {
return underlying.getSocket().getSoTimeout();
}
+
+ public static class Factory extends TTransportFactory {
+ @Override
+ public TTransport getTransport(TTransport trans) {
+ return new TimeoutChangeableTSnappyFramedTransport((TSocket) trans);
+ }
+ }
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
similarity index 61%
rename from cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java
rename to service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
index 3408a89..6d34d8d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TimeoutChangeableTransport.java
@@ -17,23 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.cluster.client.rpcutils;
-import java.util.Arrays;
-import org.apache.thrift.transport.AutoExpandingBuffer;
+package org.apache.iotdb.rpc;
-public class AutoScalingBuffer extends AutoExpandingBuffer {
+import java.net.SocketException;
- private byte[] array;
+public interface TimeoutChangeableTransport {
+ void setTimeout(int timeout);
- public AutoScalingBuffer(int initialCapacity, double growthCoefficient) {
- super(initialCapacity, growthCoefficient);
- this.array = new byte[initialCapacity];
- }
- public void shrinkSizeIfNecessary(int size) {
- if (array.length > size) {
- array = Arrays.copyOf(array, size);
- }
- }
+ int getTimeOut() throws SocketException;
+
}
diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index d5ed5eb..28b18eb 100644
--- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -23,6 +23,7 @@ import java.time.ZoneId;
import java.util.List;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RedirectException;
+import org.apache.iotdb.rpc.RpcTransportFactory;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.EndPoint;
@@ -47,7 +48,6 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
@@ -75,7 +75,7 @@ public class SessionConnection {
}
private void init(EndPoint endPoint) throws IoTDBConnectionException {
- transport = new TFastFramedTransport(
+ transport = RpcTransportFactory.INSTANCE.getTransport(
new TSocket(endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs));
try {