You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2020/12/03 05:34:38 UTC

[iotdb] branch cluster_add_snappy updated: add compression report fix single node does not commit

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch cluster_add_snappy
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/cluster_add_snappy by this push:
     new a184758  add compression report fix single node does not commit
a184758 is described below

commit a184758ead3556ceefbcfe352279c983c53c12cb
Author: jt <jt...@163.com>
AuthorDate: Thu Dec 3 13:32:23 2020 +0800

    add compression report
    fix single node does not commit
---
 .../apache/iotdb/cluster/server/NodeReport.java    | 20 +++++++++++++++++
 .../iotdb/cluster/server/member/RaftMember.java    |  4 ++++
 .../org/apache/iotdb/rpc/AutoResizingBuffer.java   |  4 ++--
 .../iotdb/rpc/TSnappyElasticFramedTransport.java   | 26 ++++++++++++++++++++++
 4 files changed, 52 insertions(+), 2 deletions(-)

diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
index 2934eef..0d84b0f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/NodeReport.java
@@ -19,9 +19,13 @@
 
 package org.apache.iotdb.cluster.server;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.iotdb.cluster.rpc.thrift.Node;
+import org.apache.iotdb.rpc.RpcTransportFactory;
+import org.apache.iotdb.rpc.TSnappyElasticFramedTransport;
+import org.xerial.snappy.Snappy;
 
 /**
  * A node report collects the current runtime information of the local node, which contains:
@@ -108,6 +112,21 @@ public class NodeReport {
 
     @Override
     public String toString() {
+      long readBytes = TSnappyElasticFramedTransport.getReadBytes();
+      long readCompressedBytes = TSnappyElasticFramedTransport.getReadCompressedBytes();
+      long writeBytes = TSnappyElasticFramedTransport.getWriteBytes();
+      long writeCompressedBytes = TSnappyElasticFramedTransport.getWriteCompressedBytes();
+      double readCompressionRatio
+          = (double) readBytes / readCompressedBytes;
+      double writeCompressionRatio = (double) writeBytes / writeCompressedBytes;
+      String transportCompressionReport = "";
+      if (RpcTransportFactory.USE_SNAPPY) {
+        transportCompressionReport =
+            ", readBytes=" + readBytes + "/" + readCompressedBytes + "(" + readCompressionRatio +
+                ")" +
+                ", writeBytes=" + writeBytes + "/" + writeCompressedBytes + "(" + writeCompressionRatio +
+                ")";
+      }
       return "MetaMemberReport{" +
           "character=" + character +
           ", Leader=" + leader +
@@ -120,6 +139,7 @@ public class NodeReport {
           ", readOnly=" + isReadOnly +
           ", lastHeartbeat=" + (System.currentTimeMillis() - lastHeartbeatReceivedTime) + "ms ago" +
           ", logIncrement=" + (lastLogIndex - prevLastLogIndex) +
+          transportCompressionReport +
           ", \n timer: "+ Timer.getReport() +
           '}';
     }
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 245b135..694eeeb 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -1505,6 +1505,10 @@ public abstract class RaftMember {
   boolean appendLogInGroup(Log log) throws LogExecutionException {
     if (allNodes.size() == 1) {
       // single node group, no followers
+      long startTime = Timer.Statistic.RAFT_SENDER_COMMIT_LOG.getOperationStartTime();
+      logger.debug("{}: log {} is accepted", name, log);
+      commitLog(log);
+      Timer.Statistic.RAFT_SENDER_COMMIT_LOG.calOperationCostTimeFromStart(startTime);
       return true;
     }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
index 10243a5..c7ec4e8 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/AutoResizingBuffer.java
@@ -59,12 +59,12 @@ class AutoResizingBuffer {
       int newCapacity = Math.max(growCapacity, size);
       this.array = Arrays.copyOf(array, newCapacity);
       bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
-      logger.info("{} expand from {} to {}, request: {}", this, currentCapacity, newCapacity, size);
+      logger.debug("{} expand from {} to {}, request: {}", this, currentCapacity, newCapacity, size);
     } else if (size > initialCapacity && currentCapacity * loadFactor > size && bufTooLargeCounter-- <= 0) {
       // do not resize if it is reading the request size
       array = Arrays.copyOf(array, size + (currentCapacity - size) / 2);
       bufTooLargeCounter = MAX_BUFFER_OVERSIZE_TIME;
-      logger.info("{} shrink from {} to {}", this, currentCapacity, size);
+      logger.debug("{} shrink from {} to {}", this, currentCapacity, size);
     }
   }
 
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
index e883157..cc1b01e 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSnappyElasticFramedTransport.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.rpc;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.thrift.transport.TByteBuffer;
 import org.apache.thrift.transport.TFastFramedTransport;
 import org.apache.thrift.transport.TFramedTransport;
@@ -30,6 +31,11 @@ import org.xerial.snappy.Snappy;
 
 public class TSnappyElasticFramedTransport extends TFastFramedTransport {
 
+  private static final AtomicLong writeBytes = new AtomicLong();
+  private static final AtomicLong writeCompressedBytes = new AtomicLong();
+  private static final AtomicLong readBytes = new AtomicLong();
+  private static final AtomicLong readCompressedBytes = new AtomicLong();
+
   private TByteBuffer writeCompressBuffer;
   private TByteBuffer readCompressBuffer;
 
@@ -101,8 +107,10 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
     }
 
     readBuffer.fill(underlying, size);
+    readCompressedBytes.addAndGet(size);
     try {
       int uncompressedLength = Snappy.uncompressedLength(readBuffer.getBuffer(), 0, size);
+      readBytes.addAndGet(uncompressedLength);
       readCompressBuffer = resizeCompressBuf(uncompressedLength, readCompressBuffer);
       Snappy.uncompress(readBuffer.getBuffer(), 0, size, readCompressBuffer.getByteBuffer().array(), 0);
       readCompressBuffer.getByteBuffer().limit(uncompressedLength);
@@ -129,11 +137,13 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
+    writeBytes.addAndGet(length);
     try {
       int maxCompressedLength = Snappy.maxCompressedLength(length);
       writeCompressBuffer = resizeCompressBuf(maxCompressedLength, writeCompressBuffer);
       int compressedLength = Snappy.compress(writeBuffer.getBuf().array(), 0, length,
           writeCompressBuffer.getByteBuffer().array(), 0);
+      writeCompressedBytes.addAndGet(compressedLength);
       TFramedTransport.encodeFrameSize(compressedLength, i32buf);
       underlying.write(i32buf, 0, 4);
 
@@ -153,4 +163,20 @@ public class TSnappyElasticFramedTransport extends TFastFramedTransport {
   public void write(byte[] buf, int off, int len) {
     writeBuffer.write(buf, off, len);
   }
+
+  public static long getReadBytes() {
+    return readBytes.get();
+  }
+
+  public static long getReadCompressedBytes() {
+    return readCompressedBytes.get();
+  }
+
+  public static long getWriteBytes() {
+    return writeBytes.get();
+  }
+
+  public static long getWriteCompressedBytes() {
+    return writeCompressedBytes.get();
+  }
 }