You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 02:14:30 UTC
[iotdb] branch cluster_add_snappy updated: fix buffer resize
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
new c986c26 fix buffer resize
c986c26 is described below
commit c986c26bfcf616e903149b612ac2b23f38db0a2e
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 10:13:05 2020 +0800
fix buffer resize
---
.../org/apache/iotdb/rpc/AutoExpandingBuffer.java | 10 +++-
.../apache/iotdb/rpc/TElasticFramedTransport.java | 58 +++-------------------
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 4 +-
3 files changed, 19 insertions(+), 53 deletions(-)
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
index 52490eb..4fe0963 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoExpandingBuffer.java
@@ -28,7 +28,11 @@ import java.util.Arrays;
* objective of avoiding expensive buffer allocations and copies.
*/
class AutoExpandingBuffer {
+ // if resizeIfNecessary is called continuously with a small size for more than
+ // MAX_BUFFER_OVERSIZE_TIME times, we will shrink the buffer to reclaim space
+ private static final int MAX_BUFFER_OVERSIZE_TIME = 5;
private byte[] array;
+ private int bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
public AutoExpandingBuffer(int initialCapacity) {
this.array = new byte[initialCapacity];
@@ -36,14 +40,16 @@ class AutoExpandingBuffer {
public void resizeIfNecessary(int size) {
final int currentCapacity = this.array.length;
- final double loadFactor = 0.3;
+ final double loadFactor = 0.6;
if (currentCapacity < size) {
// Increase by a factor of 1.5x
int growCapacity = currentCapacity + (currentCapacity >> 1);
int newCapacity = Math.max(growCapacity, size);
this.array = Arrays.copyOf(array, newCapacity);
- } else if (array.length * loadFactor > size) {
+ bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
+ } else if (array.length * loadFactor > size && bufTooLargeCounter-- <= 0) {
array = Arrays.copyOf(array, size);
+ bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
index 3fddcf1..c2eaa1f 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TElasticFramedTransport.java
@@ -18,21 +18,14 @@
*/
package org.apache.iotdb.rpc;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
-import org.xerial.snappy.Snappy;
public class TElasticFramedTransport extends TFastFramedTransport {
- private TByteBuffer writeCompressBuffer;
- private TByteBuffer readCompressBuffer;
-
public static class Factory extends TTransportFactory {
private final int initialCapacity;
@@ -67,8 +60,6 @@ public class TElasticFramedTransport extends TFastFramedTransport {
this.maxLength = maxLength;
readBuffer = new AutoScalingBufferReadTransport(initialBufferCapacity);
writeBuffer = new AutoScalingBufferWriteTransport(initialBufferCapacity);
- writeCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
- readCompressBuffer = new TByteBuffer(ByteBuffer.allocate(initialBufferCapacity));
}
private final int maxLength;
@@ -99,55 +90,22 @@ public class TElasticFramedTransport extends TFastFramedTransport {
throw new TTransportException(TTransportException.CORRUPTED_DATA,
"Read a negative frame size (" + size + ")!");
}
-
- readBuffer.fill(underlying, size);
- try {
- int uncompressedLength = Snappy.uncompressedLength(readBuffer.getBuffer(), 0, size);
- readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
- Snappy.uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
- readCompressBuffer.getByteBuffer().limit(uncompressedLength);
- readCompressBuffer.getByteBuffer().position(0);
-
- if (uncompressedLength < maxLength) {
- readBuffer.resizeIfNecessary(maxLength);
- }
- readBuffer.fill(readCompressBuffer, uncompressedLength);
- } catch (IOException e) {
- throw new TTransportException(e);
- }
- }
-
- private TByteBuffer resizeCompressBuf(int size, TByteBuffer byteBuffer) {
- double expandFactor = 1.5;
- double loadFactor = 0.5;
- if (byteBuffer.getByteBuffer().capacity() < size) {
- int newCap = (int) Math.min(size * expandFactor, maxLength);
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(newCap));
- } else if (byteBuffer.getByteBuffer().capacity() * loadFactor > size) {
- byteBuffer = new TByteBuffer(ByteBuffer.allocate(size));
+ if (size < maxLength) {
+ readBuffer.resizeIfNecessary(maxLength);
}
- return byteBuffer;
+ readBuffer.fill(underlying, size);
}
@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
TFramedTransport.encodeFrameSize(length, i32buf);
- try {
- int maxCompressedLength = Snappy.maxCompressedLength(length);
- writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
- int compressedLength = Snappy.compress(writeBuffer.getBuf().array(), 0, length,
- writeCompressBuffer.getByteBuffer().array(), 0);
- TFramedTransport.encodeFrameSize(compressedLength, i32buf);
- underlying.write(i32buf, 0, 4);
-
- underlying.write(writeCompressBuffer.getByteBuffer().array(), 0, compressedLength);
- } catch (IOException e) {
- throw new TTransportException(e);
- }
-
+ underlying.write(i32buf, 0, 4);
+ underlying.write(writeBuffer.getBuf().array(), 0, length);
writeBuffer.reset();
- writeBuffer.resizeIfNecessary(maxLength);
+ if (length > maxLength) {
+ writeBuffer.resizeIfNecessary(maxLength);
+ }
underlying.flush();
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index 426b759..17a90b8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -146,7 +146,9 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
}
writeBuffer.reset();
- writeBuffer.resizeIfNecessary(maxLength);
+ if (maxLength < length) {
+ writeBuffer.resizeIfNecessary(maxLength);
+ }
underlying.flush();
}