You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/13 06:02:09 UTC
[iotdb] branch native_raft updated: add compressed append entries
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 8f90527226 add compressed append entries
8f90527226 is described below
commit 8f90527226d3f5f341405ffcf16d870a42f0097a
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Mar 13 14:03:54 2023 +0800
add compressed append entries
---
.../natraft/client/SyncClientAdaptor.java | 9 ++
.../consensus/natraft/protocol/RaftConfig.java | 20 ++++
.../consensus/natraft/protocol/RaftMember.java | 25 ++---
.../protocol/log/appender/BlockingLogAppender.java | 108 ++++++---------------
.../natraft/protocol/log/appender/LogAppender.java | 4 +-
.../log/appender/SlidingWindowLogAppender.java | 16 +--
.../protocol/log/dispatch/LogDispatcher.java | 73 +++++++++++++-
.../natraft/service/RaftRPCServiceProcessor.java | 40 ++++++++
.../iotdb/consensus/natraft/utils/LogUtils.java | 77 +++++++++++++++
.../iotdb/consensus/natraft/utils/Timer.java | 30 +++++-
thrift-raft/src/main/thrift/raft.thrift | 14 +++
11 files changed, 304 insertions(+), 112 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index 82196bf9fa..b1977de7d1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -9,6 +9,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ExecuteReq;
@@ -83,6 +84,14 @@ public class SyncClientAdaptor {
return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
}
+ public static AppendEntryResult appendCompressedEntries(
+ AsyncRaftServiceClient client, AppendCompressedEntriesRequest request)
+ throws TException, InterruptedException {
+ GenericHandler<AppendEntryResult> matchTermHandler = new GenericHandler<>(client.getEndpoint());
+ client.appendCompressedEntries(request, matchTermHandler);
+ return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
+ }
+
public static TSStatus forceElection(AsyncRaftServiceClient client, ConsensusGroupId groupId)
throws TException, InterruptedException {
GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 78fb556cbd..6414a0ccfc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -25,6 +25,7 @@ package org.apache.iotdb.consensus.natraft.protocol;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RPCConfig;
import org.apache.iotdb.consensus.natraft.protocol.consistency.ConsistencyLevel;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import java.io.File;
import java.util.concurrent.TimeUnit;
@@ -89,6 +90,9 @@ public class RaftConfig {
private int flushRaftLogThreshold = 100_000;
private long maxSyncLogLag = 100_000;
private long syncLeaderMaxWaitMs = 30_000;
+
+ private boolean enableCompressedDispatching = true;
+ private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
private RPCConfig rpcConfig;
@@ -423,4 +427,20 @@ public class RaftConfig {
public RPCConfig getRpcConfig() {
return rpcConfig;
}
+
+ public boolean isEnableCompressedDispatching() {
+ return enableCompressedDispatching;
+ }
+
+ public void setEnableCompressedDispatching(boolean enableCompressedDispatching) {
+ this.enableCompressedDispatching = enableCompressedDispatching;
+ }
+
+ public CompressionType getDispatchingCompressionType() {
+ return dispatchingCompressionType;
+ }
+
+ public void setDispatchingCompressionType(CompressionType dispatchingCompressionType) {
+ this.dispatchingCompressionType = dispatchingCompressionType;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index ea26a6d3c9..8d96d73179 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -47,11 +47,11 @@ import org.apache.iotdb.consensus.natraft.protocol.heartbeat.ElectionReqHandler;
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatReqHandler;
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatThread;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.BlockingLogAppender;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppender;
import org.apache.iotdb.consensus.natraft.protocol.log.appender.LogAppenderFactory;
+import org.apache.iotdb.consensus.natraft.protocol.log.appender.SlidingWindowLogAppender.Factory;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.AsyncLogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.BaseApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.catchup.CatchUpManager;
@@ -93,7 +93,6 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -116,7 +115,7 @@ public class RaftMember {
private static final Logger logger = LoggerFactory.getLogger(RaftMember.class);
private RaftConfig config;
- protected static final LogAppenderFactory appenderFactory = new BlockingLogAppender.Factory();
+ protected final LogAppenderFactory appenderFactory;
protected static final LogSequencerFactory SEQUENCER_FACTORY = new SynchronousSequencer.Factory();
@@ -235,6 +234,8 @@ public class RaftMember {
stateMachine,
config,
this::examineUnappliedEntry);
+ this.appenderFactory =
+ config.isUseFollowerSlidingWindow() ? new Factory() : new BlockingLogAppender.Factory();
this.logAppender = appenderFactory.create(this, config);
this.logSequencer = SEQUENCER_FACTORY.create(this, config);
this.logDispatcher = new LogDispatcher(this, config);
@@ -517,21 +518,11 @@ public class RaftMember {
}
AppendEntryResult response;
- List<Entry> entries = new ArrayList<>();
- for (ByteBuffer buffer : request.getEntries()) {
- buffer.mark();
- Entry e;
- try {
- e = LogParser.getINSTANCE().parse(buffer);
- e.setByteSize(buffer.limit() - buffer.position());
- } catch (BufferUnderflowException ex) {
- buffer.reset();
- throw ex;
- }
- entries.add(e);
- }
+ List<Entry> entries = LogUtils.parseEntries(request.entries);
- response = logAppender.appendEntries(request, entries);
+ response =
+ logAppender.appendEntries(
+ request.prevLogIndex, request.prevLogTerm, request.leaderCommit, request.term, entries);
if (logger.isDebugEnabled()) {
logger.debug(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index ed4cb9bff0..8138562e3e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -25,14 +25,12 @@ import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
import org.apache.iotdb.consensus.natraft.utils.Response;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -56,56 +54,6 @@ public class BlockingLogAppender implements LogAppender {
this.config = config;
}
- /**
- * Find the local previous log of "log". If such log is found, discard all local logs behind it
- * and append "log" to it. Otherwise report a log mismatch.
- *
- * @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
- */
- public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
- long resp = checkPrevLogIndex(request.prevLogIndex);
- if (resp != Response.RESPONSE_AGREE) {
- return new AppendEntryResult(resp)
- .setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
- }
-
- long startWaitingTime = System.currentTimeMillis();
- long success;
- AppendEntryResult result = new AppendEntryResult();
- while (true) {
- // TODO: Consider memory footprint to execute a precise rejection
- if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
- <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- success =
- logManager.maybeAppend(
- request.prevLogIndex, request.prevLogTerm, Collections.singletonList(log));
- member.tryUpdateCommitIndex(
- request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
- break;
- }
- try {
- TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
- if (System.currentTimeMillis() - startWaitingTime
- > config.getMaxWaitingTimeWhenInsertBlocked()) {
- result.status = Response.RESPONSE_TOO_BUSY;
- return result;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- if (success != -1) {
- logger.debug("{} append a new log {}", member.getName(), log);
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- } else {
- // the incoming log points to an illegal position, reject it
- result.status = Response.RESPONSE_LOG_MISMATCH;
- }
- result.setGroupId(request.getGroupId());
- return result;
- }
-
/** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
@@ -153,19 +101,23 @@ public class BlockingLogAppender implements LogAppender {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
+ public AppendEntryResult appendEntries(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> logs) {
logger.debug(
"{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
member.getName(),
- request.prevLogIndex,
- request.prevLogTerm,
- request.leaderCommit);
+ prevLogIndex,
+ prevLogTerm,
+ leaderCommit);
if (logs.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
}
- long resp = checkPrevLogIndex(request.prevLogIndex);
+ long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
+ long resp = checkPrevLogIndex(prevLogIndex);
+ Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
+
if (resp != Response.RESPONSE_AGREE) {
return new AppendEntryResult(resp)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -178,6 +130,7 @@ public class BlockingLogAppender implements LogAppender {
}
}
+ startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY.getOperationStartTime();
AppendEntryResult result = new AppendEntryResult();
long startWaitingTime = System.currentTimeMillis();
while (true) {
@@ -185,8 +138,10 @@ public class BlockingLogAppender implements LogAppender {
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
resp =
lastConfigEntry == null
- ? appendWithoutConfigChange(request, logs, result)
- : appendWithConfigChange(request, logs, result, lastConfigEntry);
+ ? appendWithoutConfigChange(
+ prevLogIndex, prevLogTerm, leaderCommit, term, logs, result)
+ : appendWithConfigChange(
+ prevLogIndex, prevLogTerm, leaderCommit, term, logs, result, lastConfigEntry);
break;
}
@@ -201,35 +156,35 @@ public class BlockingLogAppender implements LogAppender {
Thread.currentThread().interrupt();
}
}
+ Statistic.RAFT_RECEIVER_APPEND_ENTRY.calOperationCostTimeFromStart(startTime);
return result;
}
protected long appendWithConfigChange(
- AppendEntriesRequest request,
+ long prevLogIndex,
+ long prevLogTerm,
+ long leaderCommit,
+ long term,
List<Entry> logs,
AppendEntryResult result,
ConfigChangeEntry configChangeEntry) {
long resp;
try {
logManager.getLock().writeLock().lock();
- resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
+ resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug(
- "{} append a new log list {}, commit to {}",
- member.getName(),
- logs,
- request.leaderCommit);
+ "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
}
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
member.setNewNodes(configChangeEntry.getNewPeers());
- member.tryUpdateCommitIndex(
- request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
+ member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -241,21 +196,22 @@ public class BlockingLogAppender implements LogAppender {
}
protected long appendWithoutConfigChange(
- AppendEntriesRequest request, List<Entry> logs, AppendEntryResult result) {
- long resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
+ long prevLogIndex,
+ long prevLogTerm,
+ long leaderCommit,
+ long term,
+ List<Entry> logs,
+ AppendEntryResult result) {
+ long resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug(
- "{} append a new log list {}, commit to {}",
- member.getName(),
- logs,
- request.leaderCommit);
+ "{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
}
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
- member.tryUpdateCommitIndex(
- request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
+ member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
} else {
// the incoming log points to an illegal position, reject it
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
index 2ec3eceb52..1d0bb8f56d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.appender;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import java.util.List;
@@ -31,7 +30,8 @@ import java.util.List;
*/
public interface LogAppender {
- AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> entries);
+ AppendEntryResult appendEntries(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries);
void reset();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index ab89f354cf..0985204aa5 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -24,7 +24,6 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
import org.apache.iotdb.consensus.natraft.utils.Response;
-import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.slf4j.Logger;
@@ -62,8 +61,6 @@ public class SlidingWindowLogAppender implements LogAppender {
/**
* After insert an entry into the window, check if its previous and latter entries should be
* removed if it mismatches.
- *
- * @param pos
*/
private void checkLog(int pos) {
checkLogPrev(pos);
@@ -108,10 +105,6 @@ public class SlidingWindowLogAppender implements LogAppender {
/**
* Flush window range [0, flushPos) into the LogManager, where flushPos is the first null position
* in the window.
- *
- * @param result
- * @param leaderCommit
- * @return
*/
private long flushWindow(AppendEntryResult result, long leaderCommit) {
long windowPrevLogIndex = firstPosPrevIndex;
@@ -183,7 +176,8 @@ public class SlidingWindowLogAppender implements LogAppender {
}
@Override
- public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> entries) {
+ public AppendEntryResult appendEntries(
+ long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries) {
if (entries.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -191,15 +185,15 @@ public class SlidingWindowLogAppender implements LogAppender {
AppendEntryResult result = null;
for (Entry entry : entries) {
- result = appendEntry(request.prevLogIndex, request.prevLogTerm, request.leaderCommit, entry);
+ result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, entry);
if (result.status != Response.RESPONSE_AGREE
&& result.status != Response.RESPONSE_STRONG_ACCEPT
&& result.status != Response.RESPONSE_WEAK_ACCEPT) {
return result;
}
- request.prevLogIndex = entry.getCurrLogIndex();
- request.prevLogTerm = entry.getCurrLogTerm();
+ prevLogIndex = entry.getCurrLogIndex();
+ prevLogTerm = entry.getCurrLogTerm();
}
return result;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 27ea860aad..3b83a0cd7d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -29,8 +29,12 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+import org.apache.iotdb.tsfile.compress.ICompressor;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -74,6 +78,8 @@ public class LogDispatcher {
protected ExecutorService resultHandlerThread =
IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
protected boolean queueOrdered;
+ protected boolean enableCompressedDispatching;
+ protected ICompressor compressor;
public int bindingThreadNum;
public static int maxBatchSize = 10;
@@ -82,6 +88,8 @@ public class LogDispatcher {
this.member = member;
this.config = config;
this.queueOrdered = !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
+ this.enableCompressedDispatching = config.isEnableCompressedDispatching();
+ this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
this.bindingThreadNum = config.getDispatcherBindingThreadNum();
if (!queueOrdered) {
maxBatchSize = 1;
@@ -266,7 +274,30 @@ public class LogDispatcher {
AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
try {
+ long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
AppendEntryResult appendEntryResult = SyncClientAdaptor.appendEntries(client, request);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
+ handler.onComplete(appendEntryResult);
+ } catch (Exception e) {
+ handler.onError(e);
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "{}: append entries {} with {} logs", member.getName(), receiver, logList.size());
+ }
+ }
+
+ private void appendEntriesAsync(
+ List<ByteBuffer> logList,
+ AppendCompressedEntriesRequest request,
+ List<VotingEntry> currBatch) {
+ AsyncMethodCallback<AppendEntryResult> handler = new AppendEntriesHandler(currBatch);
+ AsyncRaftServiceClient client = member.getClient(receiver.getEndpoint());
+ try {
+ long startTime = Statistic.RAFT_SENDER_SEND_LOG.getOperationStartTime();
+ AppendEntryResult appendEntryResult =
+ SyncClientAdaptor.appendCompressedEntries(client, request);
+ Statistic.RAFT_SENDER_SEND_LOG.calOperationCostTimeFromStart(startTime);
handler.onComplete(appendEntryResult);
} catch (Exception e) {
handler.onError(e);
@@ -299,6 +330,29 @@ public class LogDispatcher {
return request;
}
+ protected AppendCompressedEntriesRequest prepareCompressedRequest(
+ List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+ AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
+
+ request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
+ request.setLeader(member.getThisNode().getEndpoint());
+ request.setLeaderId(member.getThisNode().getNodeId());
+ request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
+
+ request.setTerm(member.getStatus().getTerm().get());
+
+ request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
+ request.setCompressionType((byte) compressor.getType().ordinal());
+ // set index for raft
+ request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
+ try {
+ request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
+ } catch (Exception e) {
+ logger.error("getTerm failed for newly append entries", e);
+ }
+ return request;
+ }
+
private void sendLogs(List<VotingEntry> currBatch) {
if (currBatch.isEmpty()) {
return;
@@ -316,21 +370,30 @@ public class LogDispatcher {
int prevIndex = logIndex;
for (; logIndex < currBatch.size(); logIndex++) {
- long curSize = currBatch.get(logIndex).getAppendEntryRequest().entry.array().length;
+ VotingEntry entry = currBatch.get(logIndex);
+ long curSize = entry.getAppendEntryRequest().entry.array().length;
if (logSizeLimit - curSize - logSize <= IoTDBConstant.LEFT_SIZE_IN_REQUEST) {
break;
}
logSize += curSize;
- logList.add(currBatch.get(logIndex).getAppendEntryRequest().entry);
+ logList.add(entry.getAppendEntryRequest().entry);
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_SENDING.calOperationCostTimeFromStart(
+ entry.getEntry().createTime);
+ }
+
+ if (!enableCompressedDispatching) {
+ AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+ appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
+ } else {
+ AppendCompressedEntriesRequest appendEntriesRequest =
+ prepareCompressedRequest(logList, currBatch, prevIndex);
+ appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
}
- AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
if (config.isUseFollowerLoadBalance()) {
FlowMonitorManager.INSTANCE.report(receiver, logSize);
}
nodesRateLimiter.get(receiver).acquire((int) logSize);
-
- appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index df31eeba40..fced0f868a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -32,6 +32,9 @@ import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.ConfigChangeEntry;
import org.apache.iotdb.consensus.natraft.utils.IOUtils;
+import org.apache.iotdb.consensus.natraft.utils.LogUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+import org.apache.iotdb.consensus.raft.thrift.AppendCompressedEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -42,6 +45,8 @@ import org.apache.iotdb.consensus.raft.thrift.NoMemberException;
import org.apache.iotdb.consensus.raft.thrift.RaftService;
import org.apache.iotdb.consensus.raft.thrift.RequestCommitIndexResponse;
import org.apache.iotdb.consensus.raft.thrift.SendSnapshotRequest;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
@@ -50,6 +55,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
@@ -135,12 +141,46 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
public void appendEntries(
AppendEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
throws TException {
+ long startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
RaftMember member = getMemberOrCreate(request.groupId, request);
try {
resultHandler.onComplete(member.appendEntries(request));
} catch (UnknownLogTypeException e) {
throw new TException(e);
}
+ Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
+ }
+
+ @Override
+ public void appendCompressedEntries(
+ AppendCompressedEntriesRequest request, AsyncMethodCallback<AppendEntryResult> resultHandler)
+ throws TException {
+ long startTime = Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.getOperationStartTime();
+ AppendEntriesRequest decompressedRequest = new AppendEntriesRequest();
+ decompressedRequest
+ .setTerm(request.getTerm())
+ .setLeader(request.leader)
+ .setPrevLogIndex(request.prevLogIndex)
+ .setPrevLogTerm(request.prevLogTerm)
+ .setLeaderCommit(request.leaderCommit)
+ .setGroupId(request.groupId)
+ .setLeaderId(request.leaderId);
+
+ try {
+ long compressionStartTime = Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.getOperationStartTime();
+ List<ByteBuffer> buffers =
+ LogUtils.decompressEntries(
+ request.entryBytes,
+ IUnCompressor.getUnCompressor(CompressionType.values()[request.compressionType]));
+ decompressedRequest.setEntries(buffers);
+ Statistic.RAFT_RECEIVER_DECOMPRESS_ENTRY.calOperationCostTimeFromStart(compressionStartTime);
+
+ RaftMember member = getMemberOrCreate(request.groupId, decompressedRequest);
+ resultHandler.onComplete(member.appendEntries(decompressedRequest));
+ } catch (UnknownLogTypeException | IOException e) {
+ throw new TException(e);
+ }
+ Statistic.RAFT_RECEIVER_APPEND_ENTRY_FULL.calOperationCostTimeFromStart(startTime);
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index 745e1d5fef..c536763f37 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -19,15 +19,26 @@
package org.apache.iotdb.consensus.natraft.utils;
+import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+import org.apache.iotdb.tsfile.compress.ICompressor;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
public class LogUtils {
@@ -77,4 +88,70 @@ public class LogUtils {
}
return sendLogRequest;
}
+
+ public static ByteBuffer compressEntries(List<ByteBuffer> entryByteList, ICompressor compressor) {
+ PublicBAOS baos = new PublicBAOS();
+ DataOutputStream dataOutputStream = new DataOutputStream(baos);
+ try {
+ dataOutputStream.writeInt(entryByteList.size());
+ for (ByteBuffer byteBuffer : entryByteList) {
+ dataOutputStream.writeInt(byteBuffer.remaining());
+ dataOutputStream.write(
+ byteBuffer.array(),
+ byteBuffer.arrayOffset() + byteBuffer.position(),
+ byteBuffer.remaining());
+ }
+ Statistic.LOG_DISPATCHER_RAW_SIZE.add(baos.size());
+ byte[] compressed = compressor.compress(baos.getBuf(), 0, baos.size());
+ Statistic.LOG_DISPATCHER_COMPRESSED_SIZE.add(compressed.length);
+ return ByteBuffer.wrap(compressed);
+ } catch (IOException e) {
+ logger.warn("Failed to compress entries", e);
+ }
+ return null;
+ }
+
+ public static List<ByteBuffer> decompressEntries(ByteBuffer buffer, IUnCompressor unCompressor)
+ throws IOException {
+ int uncompressedLength =
+ unCompressor.getUncompressedLength(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ byte[] uncompressed = new byte[uncompressedLength];
+ unCompressor.uncompress(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.remaining(),
+ uncompressed,
+ 0);
+ ByteBuffer uncompressedBuffer = ByteBuffer.wrap(uncompressed);
+
+ int count = uncompressedBuffer.getInt();
+ List<ByteBuffer> buffers = new ArrayList<>(count);
+ for (int i = 0; i < count; i++) {
+ int size = uncompressedBuffer.getInt();
+ ByteBuffer slice = uncompressedBuffer.slice();
+ slice.limit(slice.position() + size);
+ buffers.add(slice);
+ uncompressedBuffer.position(uncompressedBuffer.position() + size);
+ }
+
+ return buffers;
+ }
+
+ public static List<Entry> parseEntries(List<ByteBuffer> buffers) throws UnknownLogTypeException {
+ List<Entry> entries = new ArrayList<>();
+ for (ByteBuffer buffer : buffers) {
+ buffer.mark();
+ Entry e;
+ try {
+ e = LogParser.getINSTANCE().parse(buffer);
+ e.setByteSize(buffer.limit() - buffer.position());
+ } catch (BufferUnderflowException ex) {
+ buffer.reset();
+ throw ex;
+ }
+ entries.add(e);
+ }
+ return entries;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index ba3a85ac31..df654f5bfd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -103,6 +103,12 @@ public class Timer {
TIME_SCALE,
true,
DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_SEND_LOG(
+ RAFT_MEMBER_SENDER,
+ "send log to a follower",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
RAFT_SENDER_BUILD_LOG_REQUEST(
RAFT_MEMBER_SENDER,
"build SendLogRequest",
@@ -184,6 +190,12 @@ public class Timer {
RAFT_SENDER_DATA_LOG_APPLY(
RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
// raft member - receiver
+ RAFT_RECEIVER_DECOMPRESS_ENTRY(
+ RAFT_MEMBER_RECEIVER,
+ "receiver decompress entries",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
RAFT_MEMBER_RECEIVER,
"receiver wait for prev log",
@@ -302,6 +314,18 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_RAW_SIZE(
+ LOG_DISPATCHER,
+ "raw dispatching size",
+ 1,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_COMPRESSED_SIZE(
+ LOG_DISPATCHER,
+ "compressed dispatching size",
+ 1,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
@@ -421,8 +445,12 @@ public class Timer {
if (!ENABLE_INSTRUMENTING) {
return "";
}
- StringBuilder result = new StringBuilder();
+ StringBuilder result = new StringBuilder("\n");
printTo(Statistic.ROOT, result);
+ result
+ .append("Dispatcher compression ratio: ")
+ .append(LOG_DISPATCHER_COMPRESSED_SIZE.getSum() * 1.0 / LOG_DISPATCHER_RAW_SIZE.getSum())
+ .append("\n");
return result.toString();
}
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index e2e6fa7e80..67466b2080 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -31,6 +31,18 @@ struct AppendEntriesRequest {
8: required i32 leaderId
}
+struct AppendCompressedEntriesRequest {
+ 1: required i64 term // leader's
+ 2: required common.TEndPoint leader
+ 3: required binary entryBytes // data
+ 4: required i64 prevLogIndex
+ 5: required i64 prevLogTerm
+ 6: required i64 leaderCommit
+ 7: required common.TConsensusGroupId groupId
+ 8: required i32 leaderId
+ 9: required i8 compressionType
+}
+
struct AppendEntryResult {
1: required i64 status;
2: optional i64 lastLogTerm;
@@ -151,6 +163,8 @@ service RaftService {
**/
AppendEntryResult appendEntries(1:AppendEntriesRequest request)
+ AppendEntryResult appendCompressedEntries(1:AppendCompressedEntriesRequest request)
+
common.TSStatus sendSnapshot(1:SendSnapshotRequest request)
/**