You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 05:34:38 UTC
[iotdb] branch cluster_add_snappy updated: add compression report
fix single node does not commit
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
new a184758 add compression report fix single node does not commit
a184758 is described below
commit a184758ead3556ceefbcfe352279c983c53c12cb
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 13:32:23 2020 +0800
add compression report
fix single node does not commit
---
.../apache/iotdb/cluster/server/NodeReport.java | 20 +++++++++++++++++
.../iotdb/cluster/server/member/RaftMember.java | 4 ++++
.../org/apache/iotdb/rpc/AutoResizingBuffer.java | 4 ++--
.../iotdb/rpc/TSnappyElasticFramedTransport.java | 26 ++++++++++++++++++++++
4 files changed, 52 insertions(+), 2 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
index 2934eef..0d84b0f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
@@ -19,9 +19,13 @@
package org.apache.iotdb.cluster.server;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSnappyElasticFramedTransport;
+import org.xerial.snappy.Snappy;
/**
* A node report collects the current runtime information of the local node, which contains:
@@ -108,6 +112,21 @@ public class NodeReport {
@Override
public String toString() {
+ long readBytes = TSnappyElasticFramedTransport.getReadBytes();
+ long readCompressedBytes = TSnappyElasticFramedTransport.getReadCompressedBytes();
+ long writeBytes = TSnappyElasticFramedTransport.getWriteBytes();
+ long writeCompressedBytes = TSnappyElasticFramedTransport.getWriteCompressedBytes();
+ double readCompressionRatio
+ = (double) readBytes / readCompressedBytes;
+ double writeCompressionRatio = (double) writeBytes / writeCompressedBytes;
+ String transportCompressionReport = "";
+ if (RpcTransportFactory.USE_SNAPPY) {
+ transportCompressionReport =
+ ", readBytes=" + readBytes + "/" + readCompressedBytes + "(" + readCompressionRatio +
+ ")" +
+ ", writeBytes=" + writeBytes + "/" + writeCompressedBytes + "(" + writeCompressionRatio +
+ ")";
+ }
return "MetaMemberReport{" +
"character=" + character +
", Leader=" + leader +
@@ -120,6 +139,7 @@ public class NodeReport {
", readOnly=" + isReadOnly +
", lastHeartbeat=" + (System.currentTimeMillis() - lastHeartbeatReceivedTime) + "ms ago" +
", logIncrement=" + (lastLogIndex - prevLastLogIndex) +
+ transportCompressionReport +
", \n timer: "+ Timer.getReport() +
'}';
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 245b135..694eeeb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1505,6 +1505,10 @@ public abstract class RaftMember {
boolean appendLogInGroup(Log log) throws LogExecutionException {
if (allNodes.size() == 1) {
// single node group, no followers
+ long startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+ logger.debug("{}: log {} is accepted", name, log);
+ commitLog(log);
+ Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
return true;
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
index 10243a5..c7ec4e8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
@@ -59,12 +59,12 @@ class AutoResizingBuffer {
int newCapacity = Math.max(growCapacity, size);
this.array = Arrays.copyOf(array, newCapacity);
bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
- logger.info("{} expand from {} to {}, request: {}", this, currentCapacity, newCapacity, size);
+ logger.debug("{} expand from {} to {}, request: {}", this, currentCapacity, newCapacity, size);
} else if (size > initialCapacity && currentCapacity * loadFactor > size && bufTooLargeCounter-- <= 0) {
// do not resize if it is reading the request size
array = Arrays.copyOf(array, size + (currentCapacity - size) / 2);
bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
- logger.info("{} shrink from {} to {}", this, currentCapacity, size);
+ logger.debug("{} shrink from {} to {}", this, currentCapacity, size);
}
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index e883157..cc1b01e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.rpc;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.transport.TByteBuffer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TFramedTransport;
@@ -30,6 +31,11 @@ import org.xerial.snappy.Snappy;
public class TSnappyElasticFramedTransport extends TFastFramedTransport {
+ private static final AtomicLong writeBytes = new AtomicLong();
+ private static final AtomicLong writeCompressedBytes = new AtomicLong();
+ private static final AtomicLong readBytes = new AtomicLong();
+ private static final AtomicLong readCompressedBytes = new AtomicLong();
+
private TByteBuffer writeCompressBuffer;
private TByteBuffer readCompressBuffer;
@@ -101,8 +107,10 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
}
readBuffer.fill(underlying, size);
+ readCompressedBytes.addAndGet(size);
try {
int uncompressedLength = Snappy.uncompressedLength(readBuffer.getBuffer(), 0, size);
+ readBytes.addAndGet(uncompressedLength);
readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
Snappy.uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
readCompressBuffer.getByteBuffer().limit(uncompressedLength);
@@ -129,11 +137,13 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
+ writeBytes.addAndGet(length);
try {
int maxCompressedLength = Snappy.maxCompressedLength(length);
writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
int compressedLength = Snappy.compress(writeBuffer.getBuf().array(), 0, length,
writeCompressBuffer.getByteBuffer().array(), 0);
+ writeCompressedBytes.addAndGet(compressedLength);
TFramedTransport.encodeFrameSize(compressedLength, i32buf);
underlying.write(i32buf, 0, 4);
@@ -153,4 +163,20 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
public void write(byte[] buf, int off, int len) {
writeBuffer.write(buf, off, len);
}
+
+ public static long getReadBytes() {
+ return readBytes.get();
+ }
+
+ public static long getReadCompressedBytes() {
+ return readCompressedBytes.get();
+ }
+
+ public static long getWriteBytes() {
+ return writeBytes.get();
+ }
+
+ public static long getWriteCompressedBytes() {
+ return writeCompressedBytes.get();
+ }
}