You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/05/29 05:18:24 UTC
[iotdb] branch native_raft updated: use individual compressor for dispatcher threads
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new f95f0bf65ab use individual compressor for dispatcher threads
f95f0bf65ab is described below
commit f95f0bf65ab5e69bf4b73db39b1bb2e478536190
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon May 29 13:21:13 2023 +0800
use individual compressor for dispatcher threads
---
.../natraft/protocol/log/dispatch/DispatcherThread.java | 9 ++++++---
.../consensus/natraft/protocol/log/dispatch/LogDispatcher.java | 3 ---
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
index 694a1eb99ea..1c929ece745 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/DispatcherThread.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -56,6 +57,7 @@ class DispatcherThread extends DynamicThread {
private long lastDispatchTime;
private PublicBAOS batchLogBuffer = new PublicBAOS(64 * 1024);
private AtomicReference<byte[]> compressionBuffer = new AtomicReference<>(new byte[64 * 1024]);
+ protected ICompressor compressor;
protected DispatcherThread(
LogDispatcher logDispatcher,
@@ -67,6 +69,8 @@ class DispatcherThread extends DynamicThread {
this.receiver = receiver;
this.logBlockingDeque = logBlockingDeque;
this.group = group;
+ this.compressor =
+ ICompressor.getCompressor(logDispatcher.getConfig().getDispatchingCompressionType());
}
@Override
@@ -203,9 +207,8 @@ class DispatcherThread extends DynamicThread {
request.setLeaderCommit(logDispatcher.member.getLogManager().getCommitLogIndex());
request.setTerm(logDispatcher.member.getStatus().getTerm().get());
request.setEntryBytes(
- LogUtils.compressEntries(
- logList, logDispatcher.compressor, request, batchLogBuffer, compressionBuffer));
- request.setCompressionType((byte) logDispatcher.compressor.getType().ordinal());
+ LogUtils.compressEntries(logList, compressor, request, batchLogBuffer, compressionBuffer));
+ request.setCompressionType((byte) compressor.getType().ordinal());
return request;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 13713c08891..5225e5e69fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
-import org.apache.iotdb.tsfile.compress.ICompressor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +56,6 @@ public class LogDispatcher {
protected Map<Peer, Double> nodesRate = new HashMap<>();
protected boolean queueOrdered;
protected boolean enableCompressedDispatching;
- protected ICompressor compressor;
public int bindingThreadNum;
public int maxBatchSize = 10;
@@ -66,7 +64,6 @@ public class LogDispatcher {
this.config = config;
this.queueOrdered = !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
this.enableCompressedDispatching = config.isEnableCompressedDispatching();
- this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
this.bindingThreadNum = config.getDispatcherBindingThreadNum();
this.allNodes = member.getAllNodes();
this.newNodes = member.getNewNodes();