You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 06:01:07 UTC

[iotdb] 01/01: allow compress buffer to oversize maxLength during compression

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch change_transport_resize_policy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f2218dd1f341c7058c4eb89073ab2d24ee3a9583
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 13:58:28 2020 +0800

    allow compress buffer to oversize maxLength during compression
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  6 +++
 .../rpc/TCompressedElasticFramedTransport.java     | 45 ++++++++++++++++------
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 41 ++++++++++----------
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |  2 +-
 4 files changed, 61 insertions(+), 33 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index c9be448..5d3aa66 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -44,6 +44,12 @@ public class RpcUtils {
    * How big is the largest allowable frame? Defaults to 16MB.
    */
   public static final int DEFAULT_MAX_LENGTH = 16384000;
+  /**
+   * It is used to prevent the size of the parsing package from being too large and allocating the
+   * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
+   * reading. The default value is 512MB
+   */
+  public static final int FRAME_HARD_MAX_LENGTH = 536870912;
 
   private RpcUtils() {
     // util class
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 32b87b9..dba5e45 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -30,9 +30,14 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
   private TByteBuffer writeCompressBuffer;
   private TByteBuffer readCompressBuffer;
 
+  private static final long MIN_SHRINK_INTERVAL = 60_000L;
+  private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
+  private long lastShrinkTime;
+  private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+
   protected TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
-      int maxLength) {
-    super(underlying, initialBufferCapacity, maxLength);
+      int maxSoftLength) {
+    super(underlying, initialBufferCapacity, maxSoftLength);
     writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
     readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
@@ -64,14 +69,30 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
     }
   }
 
-  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
-    double expandFactor = 1.5;
-    double loadFactor = 0.6;
-    if (byteBuffer.getByteBuffer().capacity() < size) {
-      int newCap = (int) Math.min(size * expandFactor, maxLength);
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
-    } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer)
+      throws TTransportException {
+    if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
+      close();
+      throw new TTransportException(TTransportException.CORRUPTED_DATA,
+          "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+              + ")!");
+    }
+
+    final int currentCapacity = byteBuffer.getByteBuffer().capacity();
+    final double loadFactor = 0.6;
+    if (currentCapacity < size) {
+      // Increase by a factor of 1.5x
+      int growCapacity = currentCapacity + (currentCapacity >> 1);
+      int newCapacity = Math.max(growCapacity, size);
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCapacity));
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+    } else if (currentCapacity > maxSoftLength && currentCapacity * loadFactor > size
+        && bufTooLargeCounter-- <= 0
+        && System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
+      // do not shrink beneath the initial size and do not shrink too often
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size + (currentCapacity - size) / 2));
+      lastShrinkTime = System.currentTimeMillis();
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
     }
     return byteBuffer;
   }
@@ -95,8 +116,8 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
     }
 
     writeBuffer.reset();
-    if (maxLength < length) {
-      writeBuffer.resizeIfNecessary(maxLength);
+    if (maxSoftLength < length) {
+      writeBuffer.resizeIfNecessary(maxSoftLength);
     }
     underlying.flush();
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 6c7773c..4dd288b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -28,17 +28,10 @@ import org.apache.thrift.transport.TTransportFactory;
 
 public class TElasticFramedTransport extends TTransport {
 
-  /**
-   * It is used to prevent the size of the parsing package from being too large and allocating the
-   * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
-   * reading. The default value is 512MB
-   */
-  private static final int PROTECT_MAX_LENGTH = 536870912;
-
   public static class Factory extends TTransportFactory {
 
     protected final int initialCapacity;
-    protected final int maxLength;
+    protected final int maxSoftLength;
 
     public Factory() {
       this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
@@ -48,14 +41,14 @@ public class TElasticFramedTransport extends TTransport {
       this(initialCapacity, DEFAULT_MAX_LENGTH);
     }
 
-    public Factory(int initialCapacity, int maxLength) {
+    public Factory(int initialCapacity, int maxSoftLength) {
       this.initialCapacity = initialCapacity;
-      this.maxLength = maxLength;
+      this.maxSoftLength = maxSoftLength;
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TElasticFramedTransport(trans, initialCapacity, maxLength);
+      return new TElasticFramedTransport(trans, initialCapacity, maxSoftLength);
     }
   }
 
@@ -63,14 +56,21 @@ public class TElasticFramedTransport extends TTransport {
     this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
   }
 
-  public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+  public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxSoftLength) {
     this.underlying = underlying;
-    this.maxLength = maxLength;
+    this.maxSoftLength = maxSoftLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
     writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
   }
 
-  protected final int maxLength;
+  /**
+   * The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
+   * requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
+   * beneath maxSoftLength.
+   * The shrinking is limited at most once per minute to reduce overhead when maxSoftLength is
+   * set unreasonably or the workload naturally contains both ver large and very small requests.
+   */
+  protected final int maxSoftLength;
   protected final TTransport underlying;
   protected AutoScalingBufferReadTransport readBuffer;
   protected AutoScalingBufferWriteTransport writeBuffer;
@@ -113,14 +113,15 @@ public class TElasticFramedTransport extends TTransport {
           "Read a negative frame size (" + size + ")!");
     }
 
-    if (size > PROTECT_MAX_LENGTH) {
+    if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
       close();
       throw new TTransportException(TTransportException.CORRUPTED_DATA,
-          "Frame size (" + size + ") larger than protect max length (" + PROTECT_MAX_LENGTH + ")!");
+          "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+              + ")!");
     }
 
-    if (size < maxLength) {
-      readBuffer.resizeIfNecessary(maxLength);
+    if (size < maxSoftLength) {
+      readBuffer.resizeIfNecessary(maxSoftLength);
     }
     readBuffer.fill(underlying, size);
   }
@@ -132,8 +133,8 @@ public class TElasticFramedTransport extends TTransport {
     underlying.write(i32buf, 0, 4);
     underlying.write(writeBuffer.getBuf().array(), 0, length);
     writeBuffer.reset();
-    if (length > maxLength) {
-      writeBuffer.resizeIfNecessary(maxLength);
+    if (length > maxSoftLength) {
+      writeBuffer.resizeIfNecessary(maxSoftLength);
     }
     underlying.flush();
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 06f056c..5c4ed6f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -43,7 +43,7 @@ public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTrans
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
+      return new TSnappyElasticFramedTransport(trans, initialCapacity, maxSoftLength);
     }
   }