You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/11/30 12:48:07 UTC

[iotdb] branch cluster_add_snappy created (now 2f90321)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 2f90321  add snappy in transport

This branch includes the following new commits:

     new 2f90321  add snappy in transport

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: add snappy in transport

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2f90321a5e1b3ecfcbbe16f0d637e046d6f2c100
Author: jt <jt...@163.com>
AuthorDate: Mon Nov 30 20:46:33 2020 +0800

    add snappy in transport
---
 .../rpcutils/AutoScalingBufferReadTransport.java   |  4 ++
 .../client/rpcutils/TElasticFramedTransport.java   | 53 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
index b2e21b6..7a564e9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/AutoScalingBufferReadTransport.java
@@ -38,4 +38,8 @@ public class AutoScalingBufferReadTransport extends AutoExpandingBufferReadTrans
   public void shrinkSizeIfNecessary(int size) {
     buf.shrinkSizeIfNecessary(size);
   }
+
+  public void expandIfNecessary(int size) {
+    buf.resizeIfNecessary(size);
+  }
 }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java b/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
index ab5e8b8..a68d804 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/client/rpcutils/TElasticFramedTransport.java
@@ -18,14 +18,21 @@
  */
 package org.apache.iotdb.cluster.client.rpcutils;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.thrift.transport.TByteBuffer;
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
+import org.xerial.snappy.Snappy;
 
 public class TElasticFramedTransport extends TFastFramedTransport {
 
+  private TByteBuffer writeCompressBuffer;
+  private TByteBuffer readCompressBuffer;
+
   public static class Factory extends TTransportFactory {
 
     private final int initialCapacity;
@@ -60,6 +67,8 @@ public class TElasticFramedTransport extends TFastFramedTransport {
     this.maxLength = maxLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity, 1.5);
     writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity, 1.5);
+    writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
+    readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
 
   private final int maxLength;
@@ -90,18 +99,52 @@ public class TElasticFramedTransport extends TFastFramedTransport {
       throw new TTransportException(TTransportException.CORRUPTED_DATA,
           "Read a negative frame size (" + size + ")!");
     }
-    if (size < maxLength) {
-      readBuffer.shrinkSizeIfNecessary(maxLength);
-    }
+
     readBuffer.fill(underlying, size);
+    try {
+      int uncompressedLength = Snappy.uncompressedLength(readBuffer.getBuffer(), 0, size);
+      readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
+      Snappy.uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
+      readCompressBuffer.getByteBuffer().limit(uncompressedLength);
+
+      if (uncompressedLength < maxLength) {
+        readBuffer.shrinkSizeIfNecessary(maxLength);
+      }
+      readBuffer.fill(readBuffer, uncompressedLength);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+  }
+
+  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
+    double expandFactor = 1.5;
+    double loadFactor = 0.5;
+    if (byteBuffer.getByteBuffer().capacity() < size) {
+      int newCap = (int) Math.min(size * expandFactor, maxLength);
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
+    } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+    }
+    return byteBuffer;
   }
 
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
     TFramedTransport.encodeFrameSize(length, i32buf);
-    underlying.write(i32buf, 0, 4);
-    underlying.write(writeBuffer.getBuf().array(), 0, length);
+    try {
+      int maxCompressedLength = Snappy.maxCompressedLength(length);
+      writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
+      int compressedLength = Snappy.compress(writeBuffer.getBuf().array(), 0, length,
+          writeCompressBuffer.getByteBuffer().array(), 0);
+      TFramedTransport.encodeFrameSize(compressedLength, i32buf);
+      underlying.write(i32buf, 0, 4);
+
+      underlying.write(writeCompressBuffer.getByteBuffer().array(), 0, compressedLength);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+
     writeBuffer.reset();
     writeBuffer.shrinkSizeIfNecessary(maxLength);
     underlying.flush();