You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/29 05:18:24 UTC

[iotdb] branch native_raft updated: use individual compressor for dispatcher threads

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/native_raft by this push:
     new f95f0bf65ab use individual compressor for dispatcher threads
f95f0bf65ab is described below

commit f95f0bf65ab5e69bf4b73db39b1bb2e478536190
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon May 29 13:21:13 2023 +0800

    use individual compressor for dispatcher threads
---
 .../natraft/protocol/log/dispatch/DispatcherThread.java          | 9 ++++++---
 .../consensus/natraft/protocol/log/dispatch/LogDispatcher.java   | 3 ---
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 694a1eb99ea..1c929ece745 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
 import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
 import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.compress.ICompressor;
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
 
 import org.apache.thrift.async.AsyncMethodCallback;
@@ -56,6 +57,7 @@ class DispatcherThread extends DynamicThread {
   private long lastDispatchTime;
   private PublicBAOS batchLogBuffer = new PublicBAOS(64 * 1024);
   private AtomicReference<byte[]> compressionBuffer = new AtomicReference<>(new byte[64 * 1024]);
+  protected ICompressor compressor;
 
   protected DispatcherThread(
       LogDispatcher logDispatcher,
@@ -67,6 +69,8 @@ class DispatcherThread extends DynamicThread {
     this.receiver = receiver;
     this.logBlockingDeque = logBlockingDeque;
     this.group = group;
+    this.compressor =
+        ICompressor.getCompressor(logDispatcher.getConfig().getDispatchingCompressionType());
   }
 
   @Override
@@ -203,9 +207,8 @@ class DispatcherThread extends DynamicThread {
     request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
     request.setTerm(logDispatcher.member.getStatus().getTerm().get());
     request.setEntryBytes(
-        LogUtils.compressEntries(
-            logList, logDispatcher.compressor, request, batchLogBuffer, compressionBuffer));
-    request.setCompressionType((byte) logDispatcher.compressor.getType().ordinal());
+        LogUtils.compressEntries(logList, compressor, request, batchLogBuffer, compressionBuffer));
+    request.setCompressionType((byte) compressor.getType().ordinal());
     return request;
   }
 
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 13713c08891..5225e5e69fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
 import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
 import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-import org.apache.iotdb.tsfile.compress.ICompressor;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,7 +56,6 @@ public class LogDispatcher {
   protected Map<Peer, Double> nodesRate = new HashMap<>();
   protected boolean queueOrdered;
   protected boolean enableCompressedDispatching;
-  protected ICompressor compressor;
   public int bindingThreadNum;
   public int maxBatchSize = 10;
 
@@ -66,7 +64,6 @@ public class LogDispatcher {
     this.config = config;
     this.queueOrdered = !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
     this.enableCompressedDispatching = config.isEnableCompressedDispatching();
-    this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
     this.bindingThreadNum = config.getDispatcherBindingThreadNum();
     this.allNodes = member.getAllNodes();
     this.newNodes = member.getNewNodes();