You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 08:41:14 UTC
[iotdb] branch cluster_add_snappy updated: refactor extended
transports
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
new 5a1189e refactor extended transports
5a1189e is described below
commit 5a1189e8826efa0754e0208aca30e5d5298507a4
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 16:39:56 2020 +0800
refactor extended transports
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 9 ++++++
.../rpc/TCompressedElasticFramedTransport.java | 34 ++++++++++++++++------
.../apache/iotdb/rpc/TElasticFramedTransport.java | 21 +++++++++++--
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 13 ++-------
4 files changed, 56 insertions(+), 21 deletions(-)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 7131009..1a84a18 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -31,6 +31,15 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
public class RpcUtils {
+ /**
+ * How big should the default read and write buffers be?
+ */
+ public static final int DEFAULT_BUF_CAPACITY = 64 * 1024;
+ /**
+ * How big is the largest allowable frame? Defaults to 16MB.
+ */
+ public static final int DEFAULT_MAX_LENGTH = 16384000;
+
private RpcUtils() {
// util class
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index de6f79a..171408e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -18,26 +18,33 @@
*/
package org.apache.iotdb.rpc;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.thrift.transport.TByteBuffer;
-import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-public abstract class TCompressedElasticFramedTransport extends TFastFramedTransport {
+public abstract class TCompressedElasticFramedTransport extends TTransport {
private TByteBuffer writeCompressBuffer;
private TByteBuffer readCompressBuffer;
+ private final int maxLength;
+ final TTransport underlying;
+ private AutoScalingBufferReadTransport readBuffer;
+ private AutoScalingBufferWriteTransport writeBuffer;
+ private final byte[] i32buf = new byte[4];
+
public TCompressedElasticFramedTransport(TTransport underlying) {
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}
public TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
int maxLength) {
- super(underlying, initialBufferCapacity, maxLength);
this.underlying = underlying;
this.maxLength = maxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
@@ -46,12 +53,6 @@ public abstract class TCompressedElasticFramedTransport extends TFastFramedTrans
readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
- private final int maxLength;
- private final TTransport underlying;
- private AutoScalingBufferReadTransport readBuffer;
- private AutoScalingBufferWriteTransport writeBuffer;
- private final byte[] i32buf = new byte[4];
-
@Override
public int read(byte[] buf, int off, int len) throws TTransportException {
int got = readBuffer.read(buf, off, len);
@@ -142,4 +143,19 @@ public abstract class TCompressedElasticFramedTransport extends TFastFramedTrans
protected abstract void uncompress(byte[] input, int inOff, int size, byte[] output,
int outOff) throws IOException;
+
+ @Override
+ public boolean isOpen() {
+ return underlying.isOpen();
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ underlying.open();
+ }
+
+ @Override
+ public void close() {
+ underlying.close();
+ }
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index c2eaa1f..158228d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -18,13 +18,16 @@
*/
package org.apache.iotdb.rpc;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-public class TElasticFramedTransport extends TFastFramedTransport {
+public class TElasticFramedTransport extends TTransport {
public static class Factory extends TTransportFactory {
@@ -55,7 +58,6 @@ public class TElasticFramedTransport extends TFastFramedTransport {
}
public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
- super(underlying, initialBufferCapacity, maxLength);
this.underlying = underlying;
this.maxLength = maxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
@@ -69,6 +71,21 @@ public class TElasticFramedTransport extends TFastFramedTransport {
private final byte[] i32buf = new byte[4];
@Override
+ public boolean isOpen() {
+ return underlying.isOpen();
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ underlying.open();
+ }
+
+ @Override
+ public void close() {
+ underlying.close();
+ }
+
+ @Override
public int read(byte[] buf, int off, int len) throws TTransportException {
int got = readBuffer.read(buf, off, len);
if (got > 0) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 757e90c..15c5afe 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -18,6 +18,9 @@
*/
package org.apache.iotdb.rpc;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
import java.io.IOException;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;
@@ -25,15 +28,6 @@ import org.xerial.snappy.Snappy;
public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport {
- /**
- * How big should the default read and write buffers be?
- */
- public static final int DEFAULT_BUF_CAPACITY = 4 * 1024 * 1024;
- /**
- * How big is the largest allowable frame? Defaults to 16MB.
- */
- public static final int DEFAULT_MAX_LENGTH = 16384000;
-
public static class Factory extends TTransportFactory {
private final int initialCapacity;
@@ -88,5 +82,4 @@ public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTrans
throws IOException {
Snappy.uncompress(input, inOff, size, output, outOff);
}
-
}