You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/24 06:01:07 UTC
[iotdb] 01/01: allow compress buffer to oversize maxLength during
compression
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch change_transport_resize_policy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f2218dd1f341c7058c4eb89073ab2d24ee3a9583
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 24 13:58:28 2020 +0800
allow compress buffer to oversize maxLength during compression
---
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 6 +++
.../rpc/TCompressedElasticFramedTransport.java | 45 ++++++++++++++++------
.../apache/iotdb/rpc/TElasticFramedTransport.java | 41 ++++++++++----------
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 2 +-
4 files changed, 61 insertions(+), 33 deletions(-)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index c9be448..5d3aa66 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -44,6 +44,12 @@ public class RpcUtils {
* How big is the largest allowable frame? Defaults to 16MB.
*/
public static final int DEFAULT_MAX_LENGTH = 16384000;
+ /**
+ * It is used to prevent the size of the parsing package from being too large and allocating the
+ * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
+ * reading. The default value is 512MB
+ */
+ public static final int FRAME_HARD_MAX_LENGTH = 536870912;
private RpcUtils() {
// util class
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
index 32b87b9..dba5e45 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TCompressedElasticFramedTransport.java
@@ -30,9 +30,14 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
private TByteBuffer writeCompressBuffer;
private TByteBuffer readCompressBuffer;
+ private static final long MIN_SHRINK_INTERVAL = 60_000L;
+ private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
+ private long lastShrinkTime;
+ private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+
protected TCompressedElasticFramedTransport(TTransport underlying, int initialBufferCapacity,
- int maxLength) {
- super(underlying, initialBufferCapacity, maxLength);
+ int maxSoftLength) {
+ super(underlying, initialBufferCapacity, maxSoftLength);
writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
@@ -64,14 +69,30 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
}
}
- private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
- double expandFactor = 1.5;
- double loadFactor = 0.6;
- if (byteBuffer.getByteBuffer().capacity() < size) {
- int newCap = (int) Math.min(size * expandFactor, maxLength);
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
- } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+ private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer)
+ throws TTransportException {
+ if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
+ close();
+ throw new TTransportException(TTransportException.CORRUPTED_DATA,
+ "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+ + ")!");
+ }
+
+ final int currentCapacity = byteBuffer.getByteBuffer().capacity();
+ final double loadFactor = 0.6;
+ if (currentCapacity < size) {
+ // Increase by a factor of 1.5x
+ int growCapacity = currentCapacity + (currentCapacity >> 1);
+ int newCapacity = Math.max(growCapacity, size);
+ byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCapacity));
+ bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+ } else if (currentCapacity > maxSoftLength && currentCapacity * loadFactor > size
+ && bufTooLargeCounter-- <= 0
+ && System.currentTimeMillis() - lastShrinkTime > MIN_SHRINK_INTERVAL) {
+ // do not shrink beneath the initial size and do not shrink too often
+ byteBuffer = new TByteBuffer(ByteBuffer.allocate(size + (currentCapacity - size) / 2));
+ lastShrinkTime = System.currentTimeMillis();
+ bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
}
return byteBuffer;
}
@@ -95,8 +116,8 @@ public abstract class TCompressedElasticFramedTransport extends TElasticFramedTr
}
writeBuffer.reset();
- if (maxLength < length) {
- writeBuffer.resizeIfNecessary(maxLength);
+ if (maxSoftLength < length) {
+ writeBuffer.resizeIfNecessary(maxSoftLength);
}
underlying.flush();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 6c7773c..4dd288b 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -28,17 +28,10 @@ import org.apache.thrift.transport.TTransportFactory;
public class TElasticFramedTransport extends TTransport {
- /**
- * It is used to prevent the size of the parsing package from being too large and allocating the
- * buffer will cause oom. Therefore, the maximum length of the requested memory is limited when
- * reading. The default value is 512MB
- */
- private static final int PROTECT_MAX_LENGTH = 536870912;
-
public static class Factory extends TTransportFactory {
protected final int initialCapacity;
- protected final int maxLength;
+ protected final int maxSoftLength;
public Factory() {
this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
@@ -48,14 +41,14 @@ public class TElasticFramedTransport extends TTransport {
this(initialCapacity, DEFAULT_MAX_LENGTH);
}
- public Factory(int initialCapacity, int maxLength) {
+ public Factory(int initialCapacity, int maxSoftLength) {
this.initialCapacity = initialCapacity;
- this.maxLength = maxLength;
+ this.maxSoftLength = maxSoftLength;
}
@Override
public TTransport getTransport(TTransport trans) {
- return new TElasticFramedTransport(trans, initialCapacity, maxLength);
+ return new TElasticFramedTransport(trans, initialCapacity, maxSoftLength);
}
}
@@ -63,14 +56,21 @@ public class TElasticFramedTransport extends TTransport {
this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
}
- public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+ public TElasticFramedTransport(TTransport underlying, int initialBufferCapacity, int maxSoftLength) {
this.underlying = underlying;
- this.maxLength = maxLength;
+ this.maxSoftLength = maxSoftLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
}
- protected final int maxLength;
+ /**
+ * The capacity of the underlying buffer is allowed to exceed maxSoftLength, but if adjacent
+ * requests all have sizes smaller than maxSoftLength, the underlying buffer will be shrunk
+ * beneath maxSoftLength.
+ * The shrinking is limited at most once per minute to reduce overhead when maxSoftLength is
+ * set unreasonably or the workload naturally contains both ver large and very small requests.
+ */
+ protected final int maxSoftLength;
protected final TTransport underlying;
protected AutoScalingBufferReadTransport readBuffer;
protected AutoScalingBufferWriteTransport writeBuffer;
@@ -113,14 +113,15 @@ public class TElasticFramedTransport extends TTransport {
"Read a negative frame size (" + size + ")!");
}
- if (size > PROTECT_MAX_LENGTH) {
+ if (size > RpcUtils.FRAME_HARD_MAX_LENGTH) {
close();
throw new TTransportException(TTransportException.CORRUPTED_DATA,
- "Frame size (" + size + ") larger than protect max length (" + PROTECT_MAX_LENGTH + ")!");
+ "Frame size (" + size + ") larger than protect max length (" + RpcUtils.FRAME_HARD_MAX_LENGTH
+ + ")!");
}
- if (size < maxLength) {
- readBuffer.resizeIfNecessary(maxLength);
+ if (size < maxSoftLength) {
+ readBuffer.resizeIfNecessary(maxSoftLength);
}
readBuffer.fill(underlying, size);
}
@@ -132,8 +133,8 @@ public class TElasticFramedTransport extends TTransport {
underlying.write(i32buf, 0, 4);
underlying.write(writeBuffer.getBuf().array(), 0, length);
writeBuffer.reset();
- if (length > maxLength) {
- writeBuffer.resizeIfNecessary(maxLength);
+ if (length > maxSoftLength) {
+ writeBuffer.resizeIfNecessary(maxSoftLength);
}
underlying.flush();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 06f056c..5c4ed6f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -43,7 +43,7 @@ public class TSnappyElasticFramedTransport extends TCompressedElasticFramedTrans
@Override
public TTransport getTransport(TTransport trans) {
- return new TSnappyElasticFramedTransport(trans, initialCapacity, maxLength);
+ return new TSnappyElasticFramedTransport(trans, initialCapacity, maxSoftLength);
}
}