You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 02:14:30 UTC

[iotdb] branch cluster_add_snappy updated: fix buffer resize

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
     new c986c26  fix buffer resize
c986c26 is described below

commit c986c26bfcf616e903149b612ac2b23f38db0a2e
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 10:13:05 2020 +0800

    fix buffer resize
---
 .../org/apache/iotdb/rpc/AutoExpandingBuffer.java  | 10 +++-
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 58 +++-------------------
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |  4 +-
 3 files changed, 19 insertions(+), 53 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
index 52490eb..4fe0963 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
@@ -28,7 +28,11 @@ import java.util.Arrays;
  * objective of avoiding expensive buffer allocations and copies.
  */
 class AutoExpandingBuffer {
+  // if resizeIfNecessary is called continuously with a small size for more than
+  // MAX_BUFFER_OVERSIZE_TIME times, we will shrink the buffer to reclaim space
+  private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
   private byte[] array;
+  private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
 
   public AutoExpandingBuffer(int initialCapacity) {
     this.array = new byte[initialCapacity];
@@ -36,14 +40,16 @@ class AutoExpandingBuffer {
 
   public void resizeIfNecessary(int size) {
     final int currentCapacity = this.array.length;
-    final double loadFactor = 0.3;
+    final double loadFactor = 0.6;
     if (currentCapacity < size) {
       // Increase by a factor of 1.5x
       int growCapacity = currentCapacity + (currentCapacity >> 1);
       int newCapacity = Math.max(growCapacity, size);
       this.array = Arrays.copyOf(array, newCapacity);
-    } else if (array.length * loadFactor > size) {
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+    } else if (array.length * loadFactor > size && bufTooLargeCounter-- <= 0) {
       array = Arrays.copyOf(array, size);
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
     }
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 3fddcf1..c2eaa1f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -18,21 +18,14 @@
  */
 package org.apache.iotdb.rpc;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.thrift.transport.TByteBuffer;
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.thrift.transport.TTransportFactory;
-import org.xerial.snappy.Snappy;
 
 public class TElasticFramedTransport extends TFastFramedTransport {
 
-  private TByteBuffer writeCompressBuffer;
-  private TByteBuffer readCompressBuffer;
-
   public static class Factory extends TTransportFactory {
 
     private final int initialCapacity;
@@ -67,8 +60,6 @@ public class TElasticFramedTransport extends TFastFramedTransport {
     this.maxLength = maxLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
     writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
-    writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
-    readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
 
   private final int maxLength;
@@ -99,55 +90,22 @@ public class TElasticFramedTransport extends TFastFramedTransport {
       throw new TTransportException(TTransportException.CORRUPTED_DATA,
           "Read a negative frame size (" + size + ")!");
     }
-
-    readBuffer.fill(underlying, size);
-    try {
-      int uncompressedLength = Snappy.uncompressedLength(readBuffer.getBuffer(), 0, size);
-      readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
-      Snappy.uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
-      readCompressBuffer.getByteBuffer().limit(uncompressedLength);
-      readCompressBuffer.getByteBuffer().position(0);
-
-      if (uncompressedLength < maxLength) {
-        readBuffer.resizeIfNecessary(maxLength);
-      }
-      readBuffer.fill(readCompressBuffer, uncompressedLength);
-    } catch (IOException e) {
-      throw new TTransportException(e);
-    }
-  }
-
-  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
-    double expandFactor = 1.5;
-    double loadFactor = 0.5;
-    if (byteBuffer.getByteBuffer().capacity() < size) {
-      int newCap = (int) Math.min(size * expandFactor, maxLength);
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
-    } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+    if (size < maxLength) {
+      readBuffer.resizeIfNecessary(maxLength);
     }
-    return byteBuffer;
+    readBuffer.fill(underlying, size);
   }
 
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
     TFramedTransport.encodeFrameSize(length, i32buf);
-    try {
-      int maxCompressedLength = Snappy.maxCompressedLength(length);
-      writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
-      int compressedLength = Snappy.compress(writeBuffer.getBuf().array(), 0, length,
-          writeCompressBuffer.getByteBuffer().array(), 0);
-      TFramedTransport.encodeFrameSize(compressedLength, i32buf);
-      underlying.write(i32buf, 0, 4);
-
-      underlying.write(writeCompressBuffer.getByteBuffer().array(), 0, compressedLength);
-    } catch (IOException e) {
-      throw new TTransportException(e);
-    }
-
+    underlying.write(i32buf, 0, 4);
+    underlying.write(writeBuffer.getBuf().array(), 0, length);
     writeBuffer.reset();
-    writeBuffer.resizeIfNecessary(maxLength);
+    if (length > maxLength) {
+      writeBuffer.resizeIfNecessary(maxLength);
+    }
     underlying.flush();
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 426b759..17a90b8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -146,7 +146,9 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
     }
 
     writeBuffer.reset();
-    writeBuffer.resizeIfNecessary(maxLength);
+    if (maxLength < length) {
+      writeBuffer.resizeIfNecessary(maxLength);
+    }
     underlying.flush();
   }