You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 08:41:14 UTC

[iotdb] branch cluster_add_snappy updated: refactor extended transports

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
     new 5a1189e  refactor extended transports
5a1189e is described below

commit 5a1189e8826efa0754e0208aca30e5d5298507a4
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 16:39:56 2020 +0800

    refactor extended transports
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  9 ++++++
 .../rpc/TCompressedElasticFramedTransport.java     | 34 ++++++++++++++++------
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 21 +++++++++++--
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   | 13 ++-------
 4 files changed, 56 insertions(+), 21 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index 7131009..1a84a18 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -31,6 +31,15 @@ import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 public class RpcUtils {
 
+  /**
+   * How big should the default read and write buffers be?
+   */
+  public static final int DEFAULT_BUF_CAPACITY = 64 * 1024;
+  /**
+   * How big is the largest allowable frame? Defaults to 16MB.
+   */
+  public static final int DEFAULT_MAX_LENGTH = 16384000;
+
   private RpcUtils() {
     // util class
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index de6f79a..171408e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -18,26 +18,33 @@
  */
 package org.apache.iotdb.rpc;
 
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import org.apache.thrift.transport.TByteBuffer;
-import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 
-public abstract class TCompressedElasticFramedTransport extends TFastFramedTransport {
+public abstract class TCompressedElasticFramedTransport extends TTransport {
 
   private TByteBuffer writeCompressBuffer;
   private TByteBuffer readCompressBuffer;
 
+  private final int maxLength;
+  final TTransport underlying;
+  private AutoScalingBufferReadTransport readBuffer;
+  private AutoScalingBufferWriteTransport writeBuffer;
+  private final byte[] i32buf = new byte[4];
+
   public TCompressedElasticFramedTransport(TTransport underlying) {
     this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
   }
 
   public TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
       int maxLength) {
-    super(underlying, initialBufferCapacity, maxLength);
     this.underlying = underlying;
     this.maxLength = maxLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
@@ -46,12 +53,6 @@ public abstract class TCompressedElasticFramedTransport extends TFastFramedTrans
     readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
 
-  private final int maxLength;
-  private final TTransport underlying;
-  private AutoScalingBufferReadTransport readBuffer;
-  private AutoScalingBufferWriteTransport writeBuffer;
-  private final byte[] i32buf = new byte[4];
-
   @Override
   public int read(byte[] buf, int off, int len) throws TTransportException {
     int got = readBuffer.read(buf, off, len);
@@ -142,4 +143,19 @@ public abstract class TCompressedElasticFramedTransport extends TFastFramedTrans
 
   protected abstract void uncompress(byte[] input, int inOff, int size, byte[] output,
       int outOff) throws IOException;
+
+  @Override
+  public boolean isOpen() {
+    return underlying.isOpen();
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    underlying.open();
+  }
+
+  @Override
+  public void close() {
+    underlying.close();
+  }
 }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index c2eaa1f..158228d 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iotdb.rpc;
 
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
 
-public class TElasticFramedTransport extends TFastFramedTransport {
+public class TElasticFramedTransport extends TTransport {
 
   public static class Factory extends TTransportFactory {
 
@@ -55,7 +58,6 @@ public class TElasticFramedTransport extends TFastFramedTransport {
   }
 
   public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
-    super(underlying, initialBufferCapacity, maxLength);
     this.underlying = underlying;
     this.maxLength = maxLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
@@ -69,6 +71,21 @@ public class TElasticFramedTransport extends TFastFramedTransport {
   private final byte[] i32buf = new byte[4];
 
   @Override
+  public boolean isOpen() {
+    return underlying.isOpen();
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    underlying.open();
+  }
+
+  @Override
+  public void close() {
+    underlying.close();
+  }
+
+  @Override
   public int read(byte[] buf, int off, int len) throws TTransportException {
     int got = readBuffer.read(buf, off, len);
     if (got > 0) {
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 757e90c..15c5afe 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iotdb.rpc;
 
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_BUF_CAPACITY;
+import static org.apache.iotdb.rpc.RpcUtils.DEFAULT_MAX_LENGTH;
+
 import java.io.IOException;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportFactory;
@@ -25,15 +28,6 @@ import org.xerial.snappy.Snappy;
 
 public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTransport {
 
-  /**
-   * How big should the default read and write buffers be?
-   */
-  public static final int DEFAULT_BUF_CAPACITY = 4 * 1024 * 1024;
-  /**
-   * How big is the largest allowable frame? Defaults to 16MB.
-   */
-  public static final int DEFAULT_MAX_LENGTH = 16384000;
-
   public static class Factory extends TTransportFactory {
 
     private final int initialCapacity;
@@ -88,5 +82,4 @@ public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTrans
       throws IOException {
     Snappy.uncompress(input, inOff, size, output, outOff);
   }
-
 }