You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 06:01:06 UTC

[iotdb] branch change_transport_resize_policy created (now f2218dd)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch change_transport_resize_policy
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at f2218dd  allow compress buffer to oversize maxLength during compression

This branch includes the following new commits:

     new f2218dd  allow compress buffer to oversize maxLength during compression

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[iotdb] 01/01: allow compress buffer to oversize maxLength during compression

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch change_transport_resize_policy
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f2218dd1f341c7058c4eb89073ab2d24ee3a9583
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 13:58:28 2020 +0800

    allow compress buffer to oversize maxLength during compression
---
 .../main/java/org/apache/iotdb/rpc/RpcUtils.java   |  6 +++
 .../rpc/TCompressedElasticFramedTransport.java     | 45 ++++++++++++++++------
 .../apache/iotdb/rpc/TElasticFramedTransport.java  | 41 ++++++++++----------
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   |  2 +-
 4 files changed, 61 insertions(+), 33 deletions(-)

diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index c9be448..5d3aa66 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -44,6 +44,12 @@ public class RpcUtils {
    * How big is the largest allowable frame? Defaults to 16MB.
    */
   public static final int DEFAULT_MAX_LENGTH = 16384000;
+  /**
+   * It is used to prevent the size of the parsing package from being too large and allocating the
+   * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
+   * reading. The default value is 512MB
+   */
+  public static final int FRAME_HARD_MAX_LENGTH = 536870912;
 
   private RpcUtils() {
     // util class
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 32b87b9..dba5e45 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -30,9 +30,14 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
   private TByteBuffer writeCompressBuffer;
   private TByteBuffer readCompressBuffer;
 
+  private static final long MIN_SHRINK_INTERVAL = 60_000L;
+  private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
+  private long lastShrinkTime;
+  private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+
   protected TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
-      int maxLength) {
-    super(underlying, initialBufferCapacity, maxLength);
+      int maxSoftLength) {
+    super(underlying, initialBufferCapacity, maxSoftLength);
     writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
     readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
   }
@@ -64,14 +69,30 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
     }
   }
 
-  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
-    double expandFactor = 1.5;
-    double loadFactor = 0.6;
-    if (byteBuffer.getByteBuffer().capacity() < size) {
-      int newCap = (int) Math.min(size * expandFactor, maxLength);
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
-    } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
-      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+  private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer)
+      throws TTransportException {
+    if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
+      close();
+      throw new TTransportException(TTransportException.CORRUPTED_DATA,
+          "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+              + ")!");
+    }
+
+    final int currentCapacity = byteBuffer.getByteBuffer().capacity();
+    final double loadFactor = 0.6;
+    if (currentCapacity < size) {
+      // Increase by a factor of 1.5x
+      int growCapacity = currentCapacity + (currentCapacity >> 1);
+      int newCapacity = Math.max(growCapacity, size);
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCapacity));
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+    } else if (currentCapacity > maxSoftLength && currentCapacity * loadFactor > size
+        && bufTooLargeCounter-- <= 0
+        && System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
+      // do not shrink beneath the initial size and do not shrink too often
+      byteBuffer = new TByteBuffer(ByteBuffer.allocate(size + (currentCapacity - size) / 2));
+      lastShrinkTime = System.currentTimeMillis();
+      bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
     }
     return byteBuffer;
   }
@@ -95,8 +116,8 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
     }
 
     writeBuffer.reset();
-    if (maxLength < length) {
-      writeBuffer.resizeIfNecessary(maxLength);
+    if (maxSoftLength < length) {
+      writeBuffer.resizeIfNecessary(maxSoftLength);
     }
     underlying.flush();
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 6c7773c..4dd288b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -28,17 +28,10 @@ import org.apache.thrift.transport.TTransportFactory;
 
 public class TElasticFramedTransport extends TTransport {
 
-  /**
-   * It is used to prevent the size of the parsing package from being too large and allocating the
-   * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
-   * reading. The default value is 512MB
-   */
-  private static final int PROTECT_MAX_LENGTH = 536870912;
-
   public static class Factory extends TTransportFactory {
 
     protected final int initialCapacity;
-    protected final int maxLength;
+    protected final int maxSoftLength;
 
     public Factory() {
       this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
@@ -48,14 +41,14 @@ public class TElasticFramedTransport extends TTransport {
       this(initialCapacity, DEFAULT_MAX_LENGTH);
     }
 
-    public Factory(int initialCapacity, int maxLength) {
+    public Factory(int initialCapacity, int maxSoftLength) {
       this.initialCapacity = initialCapacity;
-      this.maxLength = maxLength;
+      this.maxSoftLength = maxSoftLength;
     }
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TElasticFramedTransport(trans, initialCapacity, maxLength);
+      return new TElasticFramedTransport(trans, initialCapacity, maxSoftLength);
     }
   }
 
@@ -63,14 +56,21 @@ public class TElasticFramedTransport extends TTransport {
     this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
   }
 
-  public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+  public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxSoftLength) {
     this.underlying = underlying;
-    this.maxLength = maxLength;
+    this.maxSoftLength = maxSoftLength;
     readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
     writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
   }
 
-  protected final int maxLength;
+  /**
+   * The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
+   * requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
+   * beneath maxSoftLength.
+   * The shrinking is limited at most once per minute to reduce overhead when maxSoftLength is
+   * set unreasonably or the workload naturally contains both ver large and very small requests.
+   */
+  protected final int maxSoftLength;
   protected final TTransport underlying;
   protected AutoScalingBufferReadTransport readBuffer;
   protected AutoScalingBufferWriteTransport writeBuffer;
@@ -113,14 +113,15 @@ public class TElasticFramedTransport extends TTransport {
           "Read a negative frame size (" + size + ")!");
     }
 
-    if (size > PROTECT_MAX_LENGTH) {
+    if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
       close();
       throw new TTransportException(TTransportException.CORRUPTED_DATA,
-          "Frame size (" + size + ") larger than protect max length (" + PROTECT_MAX_LENGTH + ")!");
+          "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+              + ")!");
     }
 
-    if (size < maxLength) {
-      readBuffer.resizeIfNecessary(maxLength);
+    if (size < maxSoftLength) {
+      readBuffer.resizeIfNecessary(maxSoftLength);
     }
     readBuffer.fill(underlying, size);
   }
@@ -132,8 +133,8 @@ public class TElasticFramedTransport extends TTransport {
     underlying.write(i32buf, 0, 4);
     underlying.write(writeBuffer.getBuf().array(), 0, length);
     writeBuffer.reset();
-    if (length > maxLength) {
-      writeBuffer.resizeIfNecessary(maxLength);
+    if (length > maxSoftLength) {
+      writeBuffer.resizeIfNecessary(maxSoftLength);
     }
     underlying.flush();
   }
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 06f056c..5c4ed6f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -43,7 +43,7 @@ public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTrans
 
     @Override
     public TTransport getTransport(TTransport trans) {
-      return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
+      return new TSnappyElasticFramedTransport(trans, initialCapacity, maxSoftLength);
     }
   }