You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/28 01:20:34 UTC
[iotdb] branch native_raft updated: improve lock utilization
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 411c2852c3 improve lock utilization
411c2852c3 is described below
commit 411c2852c3eb11bf40e8cb55ad89e1fa5eacf3b4
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Mar 28 09:22:29 2023 +0800
improve lock utilization
---
.../iotdb/consensus/config/ConsensusConfig.java | 2 +
.../apache/iotdb/consensus/config/RPCConfig.java | 60 +++++++++++++---
.../iotdb/consensus/natraft/RaftConsensus.java | 3 +
.../natraft/client/AsyncRaftServiceClient.java | 18 ++++-
.../natraft/client/RaftConsensusClientPool.java | 4 +-
.../consensus/natraft/protocol/RaftConfig.java | 27 ++++++++
.../consensus/natraft/protocol/RaftMember.java | 41 +++++------
.../protocol/heartbeat/HeartbeatReqHandler.java | 6 +-
.../consensus/natraft/protocol/log/Entry.java | 21 +++++-
.../consensus/natraft/protocol/log/LogParser.java | 8 ++-
.../natraft/protocol/log/VotingEntry.java | 21 ------
.../protocol/log/appender/BlockingLogAppender.java | 70 +++++++------------
.../natraft/protocol/log/appender/LogAppender.java | 3 +-
.../log/appender/SlidingWindowLogAppender.java | 78 +++++++++------------
.../natraft/protocol/log/applier/BaseApplier.java | 8 ++-
.../protocol/log/catchup/LogCatchUpTask.java | 12 ----
.../log/dispatch/AppendNodeEntryHandler.java | 43 +++++++-----
.../protocol/log/dispatch/LogDispatcher.java | 39 ++---------
.../protocol/log/logtype/ConfigChangeEntry.java | 4 +-
.../natraft/protocol/log/logtype/EmptyEntry.java | 4 +-
.../natraft/protocol/log/logtype/RequestEntry.java | 4 +-
.../protocol/log/manager/RaftLogManager.java | 80 +++++++++++++++++-----
.../serialization/SyncLogDequeSerializer.java | 2 +-
.../protocol/log/snapshot/DirectorySnapshot.java | 8 ++-
.../natraft/service/RaftRPCServiceProcessor.java | 4 +-
.../iotdb/consensus/natraft/utils/LogUtils.java | 20 ++----
.../iotdb/consensus/natraft/utils/Timer.java | 36 ++++++++++
.../db/mpp/plan/parser/StatementGenerator.java | 13 ++--
.../planner/plan/node/write/InsertTabletNode.java | 4 +-
.../apache/iotdb/db/utils/QueryDataSetUtils.java | 31 ++++++++-
.../org/apache/iotdb/db/utils/SetThreadName.java | 22 +++---
.../java/org/apache/iotdb/session/Session.java | 17 ++++-
.../apache/iotdb/session/util/SessionUtils.java | 23 +++++++
thrift-raft/src/main/thrift/raft.thrift | 27 +++-----
thrift/src/main/thrift/client.thrift | 2 +
35 files changed, 469 insertions(+), 296 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
index fed43984fe..8740be6857 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/ConsensusConfig.java
@@ -57,6 +57,8 @@ public class ConsensusConfig {
.setRpcMaxConcurrentClientNum(
ioTConsensusConfig.getRpc().getRpcMaxConcurrentClientNum())
.setThriftMaxFrameSize(ioTConsensusConfig.getRpc().getThriftMaxFrameSize())
+ .setCoreClientNumForEachNode(ioTConsensusConfig.getRpc().getCoreClientNumForEachNode())
+ .setMaxClientNumForEachNode(ioTConsensusConfig.getRpc().getMaxClientNumForEachNode())
.build();
this.properties = properties;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
index 495519cdfe..98c29bbf0a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RPCConfig.java
@@ -4,17 +4,21 @@
package org.apache.iotdb.consensus.config;
+import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
+
import java.util.concurrent.TimeUnit;
public class RPCConfig {
- private final int rpcSelectorThreadNum;
- private final int rpcMinConcurrentClientNum;
- private final int rpcMaxConcurrentClientNum;
- private final int thriftServerAwaitTimeForStopService;
- private final boolean isRpcThriftCompressionEnabled;
- private final int selectorNumOfClientManager;
- private final int connectionTimeoutInMs;
- private final int thriftMaxFrameSize;
+ private int rpcSelectorThreadNum;
+ private int rpcMinConcurrentClientNum;
+ private int rpcMaxConcurrentClientNum;
+ private int thriftServerAwaitTimeForStopService;
+ private boolean isRpcThriftCompressionEnabled;
+ private int selectorNumOfClientManager;
+ private int connectionTimeoutInMs;
+ private int thriftMaxFrameSize;
+ private int coreClientNumForEachNode;
+ private int maxClientNumForEachNode;
private RPCConfig(
int rpcSelectorThreadNum,
@@ -24,7 +28,9 @@ public class RPCConfig {
boolean isRpcThriftCompressionEnabled,
int selectorNumOfClientManager,
int connectionTimeoutInMs,
- int thriftMaxFrameSize) {
+ int thriftMaxFrameSize,
+ int coreClientNumForEachNode,
+ int maxClientNumForEachNode) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
this.rpcMinConcurrentClientNum = rpcMinConcurrentClientNum;
this.rpcMaxConcurrentClientNum = rpcMaxConcurrentClientNum;
@@ -33,6 +39,8 @@ public class RPCConfig {
this.selectorNumOfClientManager = selectorNumOfClientManager;
this.connectionTimeoutInMs = connectionTimeoutInMs;
this.thriftMaxFrameSize = thriftMaxFrameSize;
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
}
public int getRpcSelectorThreadNum() {
@@ -81,6 +89,10 @@ public class RPCConfig {
private int connectionTimeoutInMs = (int) TimeUnit.SECONDS.toMillis(20);
private int thriftMaxFrameSize = 536870912;
+ private int coreClientNumForEachNode = DefaultProperty.CORE_CLIENT_NUM_FOR_EACH_NODE;
+
+ private int maxClientNumForEachNode = DefaultProperty.MAX_CLIENT_NUM_FOR_EACH_NODE;
+
public RPCConfig.Builder setRpcSelectorThreadNum(int rpcSelectorThreadNum) {
this.rpcSelectorThreadNum = rpcSelectorThreadNum;
return this;
@@ -122,6 +134,16 @@ public class RPCConfig {
return this;
}
+ public RPCConfig.Builder setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ return this;
+ }
+
+ public RPCConfig.Builder setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
+ return this;
+ }
+
public RPCConfig build() {
return new RPCConfig(
rpcSelectorThreadNum,
@@ -131,7 +153,25 @@ public class RPCConfig {
isRpcThriftCompressionEnabled,
selectorNumOfClientManager,
connectionTimeoutInMs,
- thriftMaxFrameSize);
+ thriftMaxFrameSize,
+ coreClientNumForEachNode,
+ maxClientNumForEachNode);
}
}
+
+ public int getCoreClientNumForEachNode() {
+ return coreClientNumForEachNode;
+ }
+
+ public void setCoreClientNumForEachNode(int coreClientNumForEachNode) {
+ this.coreClientNumForEachNode = coreClientNumForEachNode;
+ }
+
+ public int getMaxClientNumForEachNode() {
+ return maxClientNumForEachNode;
+ }
+
+ public void setMaxClientNumForEachNode(int maxClientNumForEachNode) {
+ this.maxClientNumForEachNode = maxClientNumForEachNode;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index a07afb1e97..ac1f88d2a1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -156,6 +156,9 @@ public class RaftConsensus implements IConsensus {
@Override
public ConsensusWriteResponse write(ConsensusGroupId groupId, IConsensusRequest request) {
+ if (config.isOnlyTestNetwork()) {
+ return ConsensusWriteResponse.newBuilder().setStatus(StatusUtils.OK).build();
+ }
RaftMember impl = stateMachineMap.get(groupId);
if (impl == null) {
return ConsensusWriteResponse.newBuilder()
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
index 43f70d2b04..3533c90585 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicInteger;
public class AsyncRaftServiceClient extends RaftService.AsyncClient {
@@ -44,6 +45,8 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
private final TEndPoint endpoint;
private final ClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
+ private static final AtomicInteger createCnt = new AtomicInteger();
+ private static final AtomicInteger closeCnt = new AtomicInteger();
public AsyncRaftServiceClient(
TProtocolFactory protocolFactory,
@@ -58,11 +61,17 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
TNonblockingSocketWrapper.wrap(endpoint.getIp(), endpoint.getPort(), connectionTimeout));
this.endpoint = endpoint;
this.clientManager = clientManager;
+ if (createCnt.incrementAndGet() % 1000 == 0) {
+ logger.info("Created {} clients", createCnt.get(), new Exception());
+ }
}
public void close() {
___transport.close();
___currentMethod = null;
+ if (closeCnt.incrementAndGet() % 1000 == 0) {
+ logger.info("Closed {} clients", closeCnt.get(), new Exception());
+ }
}
/**
@@ -110,6 +119,13 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
} catch (Exception e) {
if (!(e.getCause() instanceof ConnectException)) {
logger.info("Unexpected exception occurs in {} :", this, e);
+ } else {
+ logger.debug("Cannot connect: {}", e.getMessage());
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException ex) {
+ // ignore
+ }
}
return false;
}
@@ -117,7 +133,7 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
@Override
public String toString() {
- return String.format("AsyncConfigNodeIServiceClient{%s}", endpoint);
+ return String.format("AsyncRaftServiceClient{%s}", endpoint);
}
public static class Factory extends AsyncThriftClientFactory<TEndPoint, AsyncRaftServiceClient> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
index d393a5c8b9..87db31f4c0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
@@ -58,8 +58,8 @@ public class RaftConsensusClientPool {
.build(),
RAFT_CONSENSUS_CLIENT_POOL_THREAD_NAME),
new ClientPoolProperty.Builder<AsyncRaftServiceClient>()
- .setMaxClientNumForEachNode(config.getMaxClientPerNode())
- .setCoreClientNumForEachNode(config.getMaxIdleClientPerNode())
+ .setMaxClientNumForEachNode(config.getRpcConfig().getMaxClientNumForEachNode())
+ .setCoreClientNumForEachNode(config.getRpcConfig().getCoreClientNumForEachNode())
.build()
.getConfig());
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index 91ca23a15f..3d2991da3e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -69,6 +69,8 @@ public class RaftConfig {
private long maxSyncLogLag = 100_000;
private int syncLeaderMaxWaitMs = 30_000;
private boolean enableCompressedDispatching = true;
+ private boolean ignoreStateMachine = false;
+ private boolean onlyTestNetwork = false;
private CompressionType dispatchingCompressionType = CompressionType.SNAPPY;
private ConsistencyLevel consistencyLevel = ConsistencyLevel.STRONG_CONSISTENCY;
private RPCConfig rpcConfig;
@@ -374,6 +376,14 @@ public class RaftConfig {
this.enableCompressedDispatching = enableCompressedDispatching;
}
+ public boolean isOnlyTestNetwork() {
+ return onlyTestNetwork;
+ }
+
+ public void setOnlyTestNetwork(boolean onlyTestNetwork) {
+ this.onlyTestNetwork = onlyTestNetwork;
+ }
+
public CompressionType getDispatchingCompressionType() {
return dispatchingCompressionType;
}
@@ -382,6 +392,14 @@ public class RaftConfig {
this.dispatchingCompressionType = dispatchingCompressionType;
}
+ public boolean isIgnoreStateMachine() {
+ return ignoreStateMachine;
+ }
+
+ public void setIgnoreStateMachine(boolean ignoreStateMachine) {
+ this.ignoreStateMachine = ignoreStateMachine;
+ }
+
public void loadProperties(Properties properties) {
logger.debug("Loading properties: {}", properties);
@@ -565,6 +583,15 @@ public class RaftConfig {
properties.getProperty(
"default_boolean_encoding", this.getDispatchingCompressionType().toString())));
+ this.setIgnoreStateMachine(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "ignore_state_machine", String.valueOf(this.isIgnoreStateMachine()))));
+
+ this.setOnlyTestNetwork(
+ Boolean.parseBoolean(
+ properties.getProperty("only_test_network", String.valueOf(this.isOnlyTestNetwork()))));
+
String consistencyLevel = properties.getProperty("consistency_level");
if (consistencyLevel != null) {
this.setConsistencyLevel(ConsistencyLevel.getConsistencyLevel(consistencyLevel));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 4aef915e65..5271e793aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -175,6 +175,8 @@ public class RaftMember {
/** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
+ private long lastCommitTaskTime;
+
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
* which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
@@ -448,7 +450,9 @@ public class RaftMember {
}
public void tryUpdateCommitIndex(long leaderTerm, long commitIndex, long commitTerm) {
- if (leaderTerm >= status.term.get() && logManager.getCommitLogIndex() < commitIndex) {
+ if (leaderTerm >= status.term.get()
+ && logManager.getCommitLogIndex() < commitIndex
+ && System.currentTimeMillis() - lastCommitTaskTime > 100) {
// there are more local logs that can be committed, commit them in a ThreadPool so the
// heartbeat response will not be blocked
CommitLogTask commitLogTask = new CommitLogTask(logManager, commitIndex, commitTerm);
@@ -457,18 +461,8 @@ public class RaftMember {
// node catch up
if (commitLogPool != null && !commitLogPool.isShutdown()) {
commitLogPool.submit(commitLogTask);
+ lastCommitTaskTime = System.currentTimeMillis();
}
-
- logger.debug(
- "{}: Inconsistent log found, leaderCommit: {}-{}, localCommit: {}-{}, "
- + "localLast: {}-{}",
- name,
- commitIndex,
- commitTerm,
- logManager.getCommitLogIndex(),
- logManager.getCommitLogTerm(),
- logManager.getLastLogIndex(),
- logManager.getLastLogTerm());
}
}
@@ -518,11 +512,9 @@ public class RaftMember {
}
AppendEntryResult response;
- List<Entry> entries = LogUtils.parseEntries(request.entries);
+ List<Entry> entries = LogUtils.parseEntries(request.entries, stateMachine);
- response =
- logAppender.appendEntries(
- request.prevLogIndex, request.prevLogTerm, request.leaderCommit, request.term, entries);
+ response = logAppender.appendEntries(request.leaderCommit, request.term, entries);
if (logger.isDebugEnabled()) {
logger.debug(
@@ -590,6 +582,10 @@ public class RaftMember {
return allNodes;
}
+ public RaftConfig getConfig() {
+ return config;
+ }
+
protected enum AppendLogResult {
OK,
TIME_OUT,
@@ -609,7 +605,7 @@ public class RaftMember {
if (!isLeader()) {
Peer leader = getLeader();
if (leader == null) {
- return StatusUtils.NO_LEADER;
+ return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader in: " + groupId);
} else if (request != null) {
return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
} else {
@@ -655,7 +651,11 @@ public class RaftMember {
return StatusUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT);
}
- return waitForEntryResult(votingEntry);
+ TSStatus tsStatus1 = waitForEntryResult(votingEntry);
+ entry.waitEndTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+ entry.waitEndTime - entry.createTime);
+ return tsStatus1;
}
protected void waitApply(Entry entry) throws LogExecutionException {
@@ -672,9 +672,6 @@ public class RaftMember {
}
}
}
- entry.waitEndTime = System.nanoTime();
- Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
- entry.waitEndTime - entry.createTime);
if (entry.getException() != null) {
throw new LogExecutionException(entry.getException());
}
@@ -974,7 +971,7 @@ public class RaftMember {
public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
if (node == null || node.equals(thisNode.getEndpoint())) {
logger.debug("{}: plan {} has no where to be forwarded", name, plan);
- return StatusUtils.NO_LEADER;
+ return StatusUtils.NO_LEADER.deepCopy().setMessage("No leader to forward in: " + groupId);
}
logger.debug("{}: Forward {} to node {}", name, plan, node);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
index 615a34224b..f7c30e60aa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.HeartBeatRequest;
import org.apache.iotdb.consensus.raft.thrift.HeartBeatResponse;
@@ -92,8 +93,9 @@ public class HeartbeatReqHandler {
response.setTerm(Response.RESPONSE_AGREE);
// tell the leader the local log progress, so it may decide whether to perform a catch-up
- response.setLastLogIndex(member.getLogManager().getLastLogIndex());
- response.setLastLogTerm(member.getLogManager().getLastLogTerm());
+ Entry lastEntry = member.getLogManager().getLastEntryUnsafe();
+ response.setLastLogIndex(lastEntry.getCurrLogIndex());
+ response.setLastLogTerm(lastEntry.getCurrLogTerm());
response.setCommitIndex(member.getLogManager().getCommitLogIndex());
// if the snapshot apply lock is held, it means that a snapshot is installing now.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index fec0bc7035..208678c6b2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -55,12 +55,23 @@ public abstract class Entry implements Comparable<Entry> {
public long committedTime;
public long applyTime;
public long waitEndTime;
+ private ByteBuffer serializationCache;
public int getDefaultSerializationBufferSize() {
return DEFAULT_SERIALIZATION_BUFFER_SIZE;
}
- public abstract ByteBuffer serialize();
+ protected abstract ByteBuffer serializeInternal();
+
+ public ByteBuffer serialize() {
+ ByteBuffer cache = serializationCache;
+ if (cache != null) {
+ return cache.slice();
+ }
+ ByteBuffer byteBuffer = serializeInternal();
+ serializationCache = byteBuffer;
+ return byteBuffer.slice();
+ };
public abstract void deserialize(ByteBuffer buffer);
@@ -167,4 +178,12 @@ public abstract class Entry implements Comparable<Entry> {
public void setFromThisNode(boolean fromThisNode) {
this.fromThisNode = fromThisNode;
}
+
+ public ByteBuffer getSerializationCache() {
+ return serializationCache;
+ }
+
+ public void setSerializationCache(ByteBuffer serializationCache) {
+ this.serializationCache = serializationCache;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
index 84ffc3b59e..3f49a6576f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/LogParser.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry.Types;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
@@ -43,7 +44,7 @@ public class LogParser {
return INSTANCE;
}
- public Entry parse(ByteBuffer buffer) throws UnknownLogTypeException {
+ public Entry parse(ByteBuffer buffer, IStateMachine stateMachine) throws UnknownLogTypeException {
if (logger.isDebugEnabled()) {
logger.debug("Received a log buffer, pos:{}, limit:{}", buffer.position(), buffer.limit());
}
@@ -55,11 +56,15 @@ public class LogParser {
throw new UnknownLogTypeException(typeInt);
}
logger.debug("The log type is {}", type);
+ int startPos = buffer.position();
Entry log;
switch (type) {
case CLIENT_REQUEST:
RequestEntry requestLog = new RequestEntry();
requestLog.deserialize(buffer);
+ if (stateMachine != null) {
+ requestLog.setRequest(stateMachine.deserializeRequest(requestLog.getRequest()));
+ }
log = requestLog;
break;
case EMPTY:
@@ -70,6 +75,7 @@ public class LogParser {
default:
throw new IllegalArgumentException(type.toString());
}
+ log.setByteSize(buffer.position() - startPos);
logger.debug("Parsed a log {}", log);
return log;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index 0d6cf613bf..b654a3a872 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -23,22 +23,18 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.Future;
public class VotingEntry {
protected Entry entry;
// for NB-Raft
protected Set<Peer> weaklyAcceptedNodes;
- private boolean hasFailed;
private AppendEntryRequest appendEntryRequest;
- private Future<ByteBuffer> serializedLogFuture;
protected List<Peer> currNodes;
protected List<Peer> newNodes;
private boolean isStronglyAccepted;
@@ -59,15 +55,6 @@ public class VotingEntry {
this.newNodes = newNodes;
}
- public VotingEntry(VotingEntry another) {
- this.entry = another.entry;
- this.weaklyAcceptedNodes = another.weaklyAcceptedNodes;
- this.setAppendEntryRequest(another.appendEntryRequest);
- this.setSerializedLogFuture(another.getSerializedLogFuture());
- this.currNodes = another.currNodes;
- this.newNodes = another.newNodes;
- }
-
public Entry getEntry() {
return entry;
}
@@ -97,14 +84,6 @@ public class VotingEntry {
this.appendEntryRequest = appendEntryRequest;
}
- public Future<ByteBuffer> getSerializedLogFuture() {
- return serializedLogFuture;
- }
-
- public void setSerializedLogFuture(Future<ByteBuffer> serializedLogFuture) {
- this.serializedLogFuture = serializedLogFuture;
- }
-
public int currNodesQuorumNum() {
return currNodes.size() / 2 + 1;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 2496ce316b..c6333b7e75 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -82,15 +82,12 @@ public class BlockingLogAppender implements LogAppender {
return alreadyWait <= config.getWriteOperationTimeoutMS();
}
- protected long checkPrevLogIndex(long prevLogIndex) {
+ protected boolean checkPrevLogIndex(long prevLogIndex) {
long lastLogIndex = logManager.getLastLogIndex();
- if (lastLogIndex < prevLogIndex && !waitForPrevLog(prevLogIndex)) {
- // there are logs missing between the incoming log and the local last log, and such logs
- // did not come within a timeout, report a mismatch to the sender and it shall fix this
- // through catch-up
- return Response.RESPONSE_LOG_MISMATCH;
- }
- return Response.RESPONSE_AGREE;
+ // there are logs missing between the incoming log and the local last log, and such logs
+ // did not come within a timeout, report a mismatch to the sender and it shall fix this
+ // through catch-up
+ return lastLogIndex >= prevLogIndex || waitForPrevLog(prevLogIndex);
}
/**
@@ -101,25 +98,19 @@ public class BlockingLogAppender implements LogAppender {
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
* .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
- public AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> logs) {
- logger.debug(
- "{}, prevLogIndex={}, prevLogTerm={}, leaderCommit={}",
- member.getName(),
- prevLogIndex,
- prevLogTerm,
- leaderCommit);
+ public AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> logs) {
+ logger.debug("{}, entries={}, leaderCommit={}", member.getName(), logs, leaderCommit);
if (logs.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
}
long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.getOperationStartTime();
- long resp = checkPrevLogIndex(prevLogIndex);
+ boolean resp = checkPrevLogIndex(logs.get(0).getCurrLogIndex() - 1);
Statistic.RAFT_RECEIVER_WAIT_FOR_PREV_LOG.calOperationCostTimeFromStart(startTime);
- if (resp != Response.RESPONSE_AGREE) {
- return new AppendEntryResult(resp)
+ if (!resp) {
+ return new AppendEntryResult(Response.RESPONSE_LOG_MISMATCH)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
}
@@ -136,12 +127,11 @@ public class BlockingLogAppender implements LogAppender {
while (true) {
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- resp =
- lastConfigEntry == null
- ? appendWithoutConfigChange(
- prevLogIndex, prevLogTerm, leaderCommit, term, logs, result)
- : appendWithConfigChange(
- prevLogIndex, prevLogTerm, leaderCommit, term, logs, result, lastConfigEntry);
+ if (lastConfigEntry == null) {
+ appendWithoutConfigChange(leaderCommit, term, logs, result);
+ } else {
+ appendWithConfigChange(leaderCommit, term, logs, result, lastConfigEntry);
+ }
break;
}
@@ -161,20 +151,18 @@ public class BlockingLogAppender implements LogAppender {
return result;
}
- protected long appendWithConfigChange(
- long prevLogIndex,
- long prevLogTerm,
+ protected boolean appendWithConfigChange(
long leaderCommit,
long term,
List<Entry> logs,
AppendEntryResult result,
ConfigChangeEntry configChangeEntry) {
- long resp;
+ boolean resp;
try {
logManager.getLock().writeLock().lock();
- resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
+ resp = logManager.maybeAppend(logs);
- if (resp != -1) {
+ if (resp) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
@@ -182,9 +170,8 @@ public class BlockingLogAppender implements LogAppender {
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
member.setNewNodes(configChangeEntry.getNewPeers());
- member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
+ member.tryUpdateCommitIndex(term, leaderCommit, -1);
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -195,24 +182,17 @@ public class BlockingLogAppender implements LogAppender {
return resp;
}
- protected long appendWithoutConfigChange(
- long prevLogIndex,
- long prevLogTerm,
- long leaderCommit,
- long term,
- List<Entry> logs,
- AppendEntryResult result) {
- long resp = logManager.maybeAppend(prevLogIndex, prevLogTerm, logs);
- if (resp != -1) {
+ protected boolean appendWithoutConfigChange(
+ long leaderCommit, long term, List<Entry> logs, AppendEntryResult result) {
+ boolean resp = logManager.maybeAppend(logs);
+ if (resp) {
if (logger.isDebugEnabled()) {
logger.debug(
"{} append a new log list {}, commit to {}", member.getName(), logs, leaderCommit);
}
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
- member.tryUpdateCommitIndex(term, leaderCommit, logManager.getTerm(leaderCommit));
-
+ member.tryUpdateCommitIndex(term, leaderCommit, -1);
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
index 1d0bb8f56d..cccd152448 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/LogAppender.java
@@ -30,8 +30,7 @@ import java.util.List;
*/
public interface LogAppender {
- AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries);
+ AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> entries);
void reset();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 6157ab6ea1..39b58cd424 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
import org.apache.iotdb.consensus.natraft.utils.Response;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.slf4j.Logger;
@@ -33,6 +34,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
public class SlidingWindowLogAppender implements LogAppender {
@@ -42,18 +44,16 @@ public class SlidingWindowLogAppender implements LogAppender {
private int windowLength = 0;
private Entry[] logWindow;
private long firstPosPrevIndex;
- private long[] prevTerms;
-
private RaftMember member;
private RaftLogManager logManager;
private RaftConfig config;
+ private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public SlidingWindowLogAppender(RaftMember member, RaftConfig config) {
this.member = member;
this.logManager = member.getLogManager();
windowCapacity = config.getMaxNumOfLogsInMem();
logWindow = new Entry[windowCapacity];
- prevTerms = new long[windowCapacity];
this.config = config;
reset();
}
@@ -69,8 +69,8 @@ public class SlidingWindowLogAppender implements LogAppender {
private void checkLogPrev(int pos) {
// check the previous entry
- long prevLogTerm = prevTerms[pos];
- if (pos > 0) {
+ long prevLogTerm = logWindow[pos].getPrevTerm();
+ if (pos > 0 && logWindow[pos - 1] != null) {
Entry prev = logWindow[pos - 1];
if (prev != null && prev.getCurrLogTerm() != prevLogTerm) {
logWindow[pos - 1] = null;
@@ -82,8 +82,8 @@ public class SlidingWindowLogAppender implements LogAppender {
// check the next entry
Entry entry = logWindow[pos];
boolean nextMismatch = false;
- if (pos < windowCapacity - 1) {
- long nextPrevTerm = prevTerms[pos + 1];
+ if (logWindow[pos + 1] != null && pos < windowCapacity - 1) {
+ long nextPrevTerm = logWindow[pos + 1].getPrevTerm();
if (nextPrevTerm != entry.getCurrLogTerm()) {
nextMismatch = true;
}
@@ -106,9 +106,7 @@ public class SlidingWindowLogAppender implements LogAppender {
* Flush window range [0, flushPos) into the LogManager, where flushPos is the first null position
* in the window.
*/
- private long flushWindow(AppendEntryResult result, long leaderCommit) {
- long windowPrevLogIndex = firstPosPrevIndex;
- long windowPrevLogTerm = prevTerms[0];
+ private boolean flushWindow(AppendEntryResult result) {
int flushPos = 0;
for (; flushPos < windowCapacity; flushPos++) {
@@ -126,14 +124,12 @@ public class SlidingWindowLogAppender implements LogAppender {
logs.get(logs.size() - 1));
long startWaitingTime = System.currentTimeMillis();
- long success;
+ boolean success;
while (true) {
// TODO: Consider memory footprint to execute a precise rejection
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, logs);
- member.tryUpdateCommitIndex(
- member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
+ success = logManager.maybeAppend(logs);
break;
}
try {
@@ -141,24 +137,25 @@ public class SlidingWindowLogAppender implements LogAppender {
if (System.currentTimeMillis() - startWaitingTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
result.status = Response.RESPONSE_TOO_BUSY;
- return -1;
+ return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
- if (success != -1) {
+ if (success) {
moveWindowRightward(flushPos, logs.get(logs.size() - 1).getCurrLogIndex());
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
+ return true;
+ } else {
+ result.status = Response.RESPONSE_LOG_MISMATCH;
+ logger.warn("Cannot flush the window to log");
+ return false;
}
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(firstPosPrevIndex);
- result.setLastLogTerm(logManager.getLastLogTerm());
- return success;
}
private void moveWindowRightward(int step, long newIndex) {
System.arraycopy(logWindow, step, logWindow, 0, windowCapacity - step);
- System.arraycopy(prevTerms, step, prevTerms, 0, windowCapacity - step);
for (int i = 1; i <= step; i++) {
logWindow[windowCapacity - i] = null;
}
@@ -168,7 +165,6 @@ public class SlidingWindowLogAppender implements LogAppender {
private void moveWindowLeftward(int step) {
int length = Math.max(windowCapacity - step, 0);
System.arraycopy(logWindow, 0, logWindow, step, length);
- System.arraycopy(prevTerms, 0, prevTerms, step, length);
for (int i = 0; i < length; i++) {
logWindow[i] = null;
}
@@ -176,8 +172,7 @@ public class SlidingWindowLogAppender implements LogAppender {
}
@Override
- public AppendEntryResult appendEntries(
- long prevLogIndex, long prevLogTerm, long leaderCommit, long term, List<Entry> entries) {
+ public AppendEntryResult appendEntries(long leaderCommit, long term, List<Entry> entries) {
if (entries.isEmpty()) {
return new AppendEntryResult(Response.RESPONSE_AGREE)
.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -185,51 +180,44 @@ public class SlidingWindowLogAppender implements LogAppender {
AppendEntryResult result = null;
for (Entry entry : entries) {
- result = appendEntry(prevLogIndex, prevLogTerm, leaderCommit, entry);
+ result = appendEntry(leaderCommit, entry);
if (result.status != Response.RESPONSE_AGREE
&& result.status != Response.RESPONSE_STRONG_ACCEPT
&& result.status != Response.RESPONSE_WEAK_ACCEPT) {
return result;
}
- prevLogIndex = entry.getCurrLogIndex();
- prevLogTerm = entry.getCurrLogTerm();
}
return result;
}
- private AppendEntryResult appendEntry(
- long prevLogIndex, long prevLogTerm, long leaderCommit, Entry entry) {
- long appendedPos = 0;
+ private AppendEntryResult appendEntry(long leaderCommit, Entry entry) {
+ boolean appended = false;
AppendEntryResult result = new AppendEntryResult();
+ long prevLogIndex = entry.getCurrLogIndex() - 1;
+ long startTime = Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.getOperationStartTime();
synchronized (this) {
- int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
+ Statistic.RAFT_RECEIVER_WAIT_FOR_WINDOW.calOperationCostTimeFromStart(startTime);
+ int windowPos = (int) (prevLogIndex - firstPosPrevIndex);
if (windowPos < 0) {
// the new entry may replace an appended entry
- appendedPos =
- logManager.maybeAppend(prevLogIndex, prevLogTerm, Collections.singletonList(entry));
- member.tryUpdateCommitIndex(
- member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
- result.status = Response.RESPONSE_STRONG_ACCEPT;
- result.setLastLogIndex(logManager.getLastLogIndex());
- result.setLastLogTerm(logManager.getLastLogTerm());
+ appended = logManager.maybeAppend(Collections.singletonList(entry));
moveWindowLeftward(-windowPos);
+ result.status = Response.RESPONSE_STRONG_ACCEPT;
} else if (windowPos < windowCapacity) {
// the new entry falls into the window
logWindow[windowPos] = entry;
- prevTerms[windowPos] = prevLogTerm;
if (windowLength < windowPos + 1) {
windowLength = windowPos + 1;
}
checkLog(windowPos);
if (windowPos == 0) {
- appendedPos = flushWindow(result, leaderCommit);
+ appended = flushWindow(result);
} else {
result.status = Response.RESPONSE_WEAK_ACCEPT;
}
-
} else {
result.setStatus(Response.RESPONSE_OUT_OF_WINDOW);
result.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
@@ -237,9 +225,9 @@ public class SlidingWindowLogAppender implements LogAppender {
}
}
- if (appendedPos == -1) {
- // the incoming log points to an illegal position, reject it
- result.status = Response.RESPONSE_LOG_MISMATCH;
+ if (appended) {
+ result.setLastLogIndex(logManager.getLastEntryIndexUnsafe());
+ member.tryUpdateCommitIndex(member.getStatus().getTerm().get(), leaderCommit, -1);
}
return result;
}
@@ -247,9 +235,7 @@ public class SlidingWindowLogAppender implements LogAppender {
@Override
public void reset() {
this.firstPosPrevIndex = logManager.getLastLogIndex();
- this.prevTerms[0] = logManager.getLastLogTerm();
logWindow = new Entry[windowCapacity];
- prevTerms = new long[windowCapacity];
windowLength = 0;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index c38e5fc260..ed2b4dddba 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -53,9 +53,11 @@ public class BaseApplier implements LogApplier {
if (e instanceof RequestEntry) {
RequestEntry requestLog = (RequestEntry) e;
IConsensusRequest request = requestLog.getRequest();
- TSStatus status = applyRequest(request);
- if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- e.setException(new ConsensusException(status.message + ":" + status.code));
+ if (!member.getConfig().isIgnoreStateMachine()) {
+ TSStatus status = applyRequest(request);
+ if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ e.setException(new ConsensusException(status.message + ":" + status.code));
+ }
}
} else if (e instanceof ConfigChangeEntry) {
member.applyConfigChange(((ConfigChangeEntry) e));
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
index 13e770f9e7..454ab7a6c9 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
@@ -79,18 +79,6 @@ public class LogCatchUpTask implements Callable<Boolean> {
request.setTerm(raftMember.getStatus().getTerm().get());
request.setEntries(logList);
- // set index for raft
- request.setPrevLogIndex(logs.get(startPos).getCurrLogIndex() - 1);
- if (startPos != 0) {
- request.setPrevLogTerm(logs.get(startPos - 1).getCurrLogTerm());
- } else {
- try {
- request.setPrevLogTerm(
- raftMember.getLogManager().getTerm(logs.get(0).getCurrLogIndex() - 1));
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
- }
logger.debug("{}, node={} catchup request={}", raftMember.getName(), node, request);
return request;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 4d90e0d9b2..3c98156091 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.thrift.TApplicationException;
@@ -48,7 +49,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
private static final Logger logger = LoggerFactory.getLogger(AppendNodeEntryHandler.class);
protected RaftMember member;
- protected VotingEntry log;
+ protected VotingEntry votingEntry;
protected Peer directReceiver;
public AppendNodeEntryHandler() {}
@@ -64,13 +65,16 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
: directReceiver;
logger.debug(
- "{}: Append response {} from {} for log {}", member.getName(), response, trueReceiver, log);
+ "{}: Append response {} from {} for log {}",
+ member.getName(),
+ response,
+ trueReceiver,
+ votingEntry);
long resp = response.status;
if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
- member.getVotingLogList().onStronglyAccept(log, trueReceiver);
-
+ member.getVotingLogList().onStronglyAccept(votingEntry, trueReceiver);
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
} else if (resp > 0) {
// a response > 0 is the follower's term
@@ -80,15 +84,18 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
member.getName(),
trueReceiver,
resp,
- log);
+ votingEntry);
member.stepDown(resp, null);
- synchronized (log) {
- log.notifyAll();
+ synchronized (votingEntry.getEntry()) {
+ votingEntry.getEntry().notifyAll();
}
} else if (resp == RESPONSE_WEAK_ACCEPT) {
- synchronized (log) {
- log.addWeaklyAcceptedNodes(trueReceiver);
- log.notifyAll();
+ votingEntry.getEntry().acceptedTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
+ votingEntry.getEntry().acceptedTime - votingEntry.getEntry().createTime);
+ synchronized (votingEntry.getEntry()) {
+ votingEntry.addWeaklyAcceptedNodes(trueReceiver);
+ votingEntry.getEntry().notifyAll();
}
} else {
// e.g., Response.RESPONSE_LOG_MISMATCH
@@ -96,14 +103,14 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.debug(
"{}: The log {} is rejected by {} because: {}",
member.getName(),
- log,
+ votingEntry,
trueReceiver,
resp);
} else {
logger.warn(
"{}: The log {} is rejected by {} because: {}",
member.getName(),
- log,
+ votingEntry,
trueReceiver,
resp);
}
@@ -124,17 +131,21 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
logger.debug(
"{}: Cannot append log {}: cannot connect to {}: {}",
member.getName(),
- log,
+ votingEntry,
directReceiver,
exception.getMessage());
} else {
logger.warn(
- "{}: Cannot append log {} to {}", member.getName(), log, directReceiver, exception);
+ "{}: Cannot append log {} to {}",
+ member.getName(),
+ votingEntry,
+ directReceiver,
+ exception);
}
}
- public void setLog(VotingEntry log) {
- this.log = log;
+ public void setVotingEntry(VotingEntry votingEntry) {
+ this.votingEntry = votingEntry;
}
public void setMember(RaftMember member) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 5b5995064c..463f9f0748 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -75,8 +75,6 @@ public class LogDispatcher {
protected Map<Peer, RateLimiter> nodesRateLimiter = new HashMap<>();
protected Map<Peer, Double> nodesRate = new HashMap<>();
protected Map<Peer, ExecutorService> executorServices = new HashMap<>();
- protected ExecutorService resultHandlerThread =
- IoTDBThreadPoolFactory.newFixedThreadPool(2, "AppendResultHandler");
protected boolean queueOrdered;
protected boolean enableCompressedDispatching;
protected ICompressor compressor;
@@ -91,9 +89,6 @@ public class LogDispatcher {
this.enableCompressedDispatching = config.isEnableCompressedDispatching();
this.compressor = ICompressor.getCompressor(config.getDispatchingCompressionType());
this.bindingThreadNum = config.getDispatcherBindingThreadNum();
- if (!queueOrdered) {
- maxBatchSize = 1;
- }
this.allNodes = member.getAllNodes();
this.newNodes = member.getNewNodes();
createQueueAndBindingThreads(unionNodes(allNodes, newNodes));
@@ -151,7 +146,6 @@ public class LogDispatcher {
logger.warn("Cannot shut down dispatcher pool of {}-{}", member.getName(), entry.getKey());
}
}
- resultHandlerThread.shutdownNow();
}
protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
@@ -238,7 +232,7 @@ public class LogDispatcher {
VotingEntry poll = logBlockingDeque.poll();
if (poll != null) {
currBatch.add(poll);
- logBlockingDeque.drainTo(currBatch, maxBatchSize);
+ logBlockingDeque.drainTo(currBatch, maxBatchSize - 1);
} else {
logBlockingDeque.wait(10);
continue;
@@ -309,48 +303,28 @@ public class LogDispatcher {
}
}
- protected AppendEntriesRequest prepareRequest(
- List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+ protected AppendEntriesRequest prepareRequest(List<ByteBuffer> logList) {
AppendEntriesRequest request = new AppendEntriesRequest();
request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
request.setLeader(member.getThisNode().getEndpoint());
request.setLeaderId(member.getThisNode().getNodeId());
request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
request.setTerm(member.getStatus().getTerm().get());
-
request.setEntries(logList);
- // set index for raft
- request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
- try {
- request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
return request;
}
- protected AppendCompressedEntriesRequest prepareCompressedRequest(
- List<ByteBuffer> logList, List<VotingEntry> currBatch, int firstIndex) {
+ protected AppendCompressedEntriesRequest prepareCompressedRequest(List<ByteBuffer> logList) {
AppendCompressedEntriesRequest request = new AppendCompressedEntriesRequest();
request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
request.setLeader(member.getThisNode().getEndpoint());
request.setLeaderId(member.getThisNode().getNodeId());
request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
-
request.setTerm(member.getStatus().getTerm().get());
-
request.setEntryBytes(LogUtils.compressEntries(logList, compressor));
request.setCompressionType((byte) compressor.getType().ordinal());
- // set index for raft
- request.setPrevLogIndex(currBatch.get(firstIndex).getEntry().getCurrLogIndex() - 1);
- try {
- request.setPrevLogTerm(currBatch.get(firstIndex).getAppendEntryRequest().prevLogTerm);
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
return request;
}
@@ -383,11 +357,10 @@ public class LogDispatcher {
}
if (!enableCompressedDispatching) {
- AppendEntriesRequest appendEntriesRequest = prepareRequest(logList, currBatch, prevIndex);
+ AppendEntriesRequest appendEntriesRequest = prepareRequest(logList);
appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
} else {
- AppendCompressedEntriesRequest appendEntriesRequest =
- prepareCompressedRequest(logList, currBatch, prevIndex);
+ AppendCompressedEntriesRequest appendEntriesRequest = prepareCompressedRequest(logList);
appendEntriesAsync(logList, appendEntriesRequest, currBatch.subList(prevIndex, logIndex));
}
@@ -401,7 +374,7 @@ public class LogDispatcher {
public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setDirectReceiver(node);
- handler.setLog(log);
+ handler.setVotingEntry(log);
handler.setMember(member);
return handler;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 1e59fccc49..62cad1f6b8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -40,13 +40,14 @@ public class ConfigChangeEntry extends Entry {
}
@Override
- public ByteBuffer serialize() {
+ protected ByteBuffer serializeInternal() {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream(getDefaultSerializationBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
dataOutputStream.writeLong(getCurrLogTerm());
+ dataOutputStream.writeLong(getPrevTerm());
dataOutputStream.writeInt(oldPeers.size());
for (Peer oldPeer : oldPeers) {
@@ -66,6 +67,7 @@ public class ConfigChangeEntry extends Entry {
public void deserialize(ByteBuffer buffer) {
setCurrLogIndex(buffer.getLong());
setCurrLogTerm(buffer.getLong());
+ setPrevTerm(buffer.getLong());
int size = buffer.getInt();
oldPeers = new ArrayList<>(size);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
index 28a8724115..db6d7f40da 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/EmptyEntry.java
@@ -36,13 +36,14 @@ public class EmptyEntry extends Entry {
}
@Override
- public ByteBuffer serialize() {
+ protected ByteBuffer serializeInternal() {
ByteArrayOutputStream byteArrayOutputStream =
new ByteArrayOutputStream(getDefaultSerializationBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) Types.EMPTY.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
dataOutputStream.writeLong(getCurrLogTerm());
+ dataOutputStream.writeLong(getPrevTerm());
} catch (IOException e) {
// unreachable
}
@@ -53,6 +54,7 @@ public class EmptyEntry extends Entry {
public void deserialize(ByteBuffer buffer) {
setCurrLogIndex(buffer.getLong());
setCurrLogTerm(buffer.getLong());
+ setPrevTerm(buffer.getLong());
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index a81bb5924b..42f1a2659f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -47,13 +47,14 @@ public class RequestEntry extends Entry {
}
@Override
- public ByteBuffer serialize() {
+ protected ByteBuffer serializeInternal() {
PublicBAOS byteArrayOutputStream = new PublicBAOS(getDefaultSerializationBufferSize());
try (DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream)) {
dataOutputStream.writeByte((byte) CLIENT_REQUEST.ordinal());
dataOutputStream.writeLong(getCurrLogIndex());
dataOutputStream.writeLong(getCurrLogTerm());
+ dataOutputStream.writeLong(getPrevTerm());
ByteBuffer byteBuffer = request.serializeToByteBuffer();
byteBuffer.rewind();
@@ -80,6 +81,7 @@ public class RequestEntry extends Entry {
public void deserialize(ByteBuffer buffer) {
setCurrLogIndex(buffer.getLong());
setCurrLogTerm(buffer.getLong());
+ setPrevTerm(buffer.getLong());
int len = buffer.getInt();
byte[] bytes = new byte[len];
buffer.get(bytes);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 5f2c4f6f04..aee2fa76cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -249,12 +249,7 @@ public abstract class RaftLogManager {
* @return lastIndex
*/
public long getLastLogIndex() {
- try {
- lock.readLock().lock();
- return entries.get(entries.size() - 1).getCurrLogIndex();
- } finally {
- lock.readLock().unlock();
- }
+ return getLastEntry().getCurrLogIndex();
}
public Entry getLastEntry() {
@@ -266,6 +261,29 @@ public abstract class RaftLogManager {
}
}
+ public Entry getLastEntryUnsafe() {
+ while (true) {
+ if (entries.isEmpty()) {
+ logger.error("{} should at least have one entry", name);
+ return getLastEntry();
+ }
+ try {
+ return entries.get(entries.size() - 1);
+ } catch (IndexOutOfBoundsException e) {
+ // ignore
+ }
+ }
+ }
+
+ public long getLastEntryIndexUnsafe() {
+ Entry lastEntryUnsafe = getLastEntryUnsafe();
+ if (lastEntryUnsafe != null) {
+ return lastEntryUnsafe.getCurrLogIndex();
+ } else {
+ return getLastLogIndex();
+ }
+ }
+
/**
* Returns the term for given index.
*
@@ -329,17 +347,23 @@ public abstract class RaftLogManager {
* Used by follower node to support leader's complicated log replication rpc parameters and try to
* commit entries.
*
- * @param lastIndex leader's matchIndex for this follower node
- * @param lastTerm the entry's term which index is leader's matchIndex for this follower node
* @param entries entries sent from the leader node Note that the leader must ensure
* entries[0].index = lastIndex + 1
* @return -1 if the entries cannot be appended, otherwise the last index of new entries
*/
- public long maybeAppend(long lastIndex, long lastTerm, List<Entry> entries) {
+ public boolean maybeAppend(List<Entry> entries) {
+ if (entries.isEmpty()) {
+ return true;
+ }
+
+ long lastIndex = entries.get(0).getCurrLogIndex() - 1;
+ long lastTerm = entries.get(0).getPrevTerm();
+ long startTime = Statistic.RAFT_RECEIVER_WAIT_LOCK.getOperationStartTime();
try {
lock.writeLock().lock();
+ Statistic.RAFT_RECEIVER_WAIT_LOCK.calOperationCostTimeFromStart(startTime);
+ startTime = Statistic.RAFT_RECEIVER_APPEND_INTERNAL.getOperationStartTime();
if (matchTerm(lastTerm, lastIndex)) {
- long newLastIndex = lastIndex + entries.size();
long ci = findConflict(entries);
if (ci <= commitIndex) {
if (ci != -1) {
@@ -357,14 +381,25 @@ public abstract class RaftLogManager {
entries.size() - 1);
}
}
-
} else {
long offset = lastIndex + 1;
- append(entries.subList((int) (ci - offset), entries.size()), false);
+ try {
+ append(entries.subList((int) (ci - offset), entries.size()), false);
+ } catch (IllegalArgumentException e) {
+ logger.error(
+ "Appending {}, ci {}, offset {}, lastIndex {}, localLast {}",
+ entries,
+ ci,
+ offset,
+ lastIndex,
+ getLastLogIndex());
+ throw e;
+ }
}
- return newLastIndex;
+ Statistic.RAFT_RECEIVER_APPEND_INTERNAL.calOperationCostTimeFromStart(startTime);
+ return true;
}
- return -1;
+ return false;
} finally {
lock.writeLock().unlock();
}
@@ -378,7 +413,7 @@ public abstract class RaftLogManager {
* @return the newly generated lastIndex
*/
public long append(List<Entry> appendingEntries, boolean isLeader) {
- if (entries.isEmpty()) {
+ if (appendingEntries.isEmpty()) {
return getLastLogIndex();
}
@@ -410,7 +445,7 @@ public abstract class RaftLogManager {
}
}
- if (!isLeader) {
+ if (!isLeader && !config.isUseFollowerSlidingWindow()) {
// log update condition is to inform follower appending threads that the log is updated, and
// the leader does not concern it
Object logUpdateCondition =
@@ -427,11 +462,12 @@ public abstract class RaftLogManager {
* Used by leader node to try to commit entries.
*
* @param leaderCommit leader's commitIndex
- * @param term the entry's term which index is leaderCommit in leader's log module
+ * @param term the entry's term which index is leaderCommit in leader's log module, if term is -1,
+ * the commit is after a successful appending and unnecessary to check term
* @return true or false
*/
public boolean maybeCommit(long leaderCommit, long term) {
- if (leaderCommit > commitIndex && matchTerm(term, leaderCommit)) {
+ if (leaderCommit > commitIndex && (term == -1 || matchTerm(term, leaderCommit))) {
try {
commitTo(leaderCommit);
} catch (LogExecutionException e) {
@@ -597,6 +633,7 @@ public abstract class RaftLogManager {
entry.committedTime = System.nanoTime();
Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(entry.committedTime - entry.createTime);
}
+ entry.setSerializationCache(null);
}
}
@@ -612,9 +649,12 @@ public abstract class RaftLogManager {
try {
lock.writeLock().lock();
+ long startTime = Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.getOperationStartTime();
long lo = commitIndex + 1;
long hi = newCommitIndex + 1;
+ long getLogStart = Statistic.RAFT_SENDER_GET_LOG_FOR_COMMIT.getOperationStartTime();
List<Entry> entries = new ArrayList<>(getEntries(lo, hi));
+ Statistic.RAFT_SENDER_GET_LOG_FOR_COMMIT.calOperationCostTimeFromStart(getLogStart);
if (entries.isEmpty()) {
return;
@@ -629,6 +669,7 @@ public abstract class RaftLogManager {
checkCompaction(entries);
commitEntries(entries);
applyEntries(entries);
+ Statistic.RAFT_SENDER_COMMIT_HOLD_LOCK.calOperationCostTimeFromStart(startTime);
} finally {
lock.writeLock().unlock();
}
@@ -686,6 +727,9 @@ public abstract class RaftLogManager {
}
try {
logApplier.apply(entry);
+ if (entry.createTime != 0) {
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_APPLIER.add(System.nanoTime() - entry.createTime);
+ }
} catch (Exception e) {
entry.setException(e);
entry.setApplied(true);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index e348541ce2..a9aaae7d83 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -1353,7 +1353,7 @@ public class SyncLogDequeSerializer implements StableEntryManager {
byte[] bytes = ReadWriteIOUtils.readBytes(bufferedInputStream, logBlockSize);
ByteBuffer uncompressed = ByteBuffer.wrap(unCompressor.uncompress(bytes));
while (uncompressed.remaining() > 0) {
- Entry e = parser.parse(uncompressed);
+ Entry e = parser.parse(uncompressed, null);
result.add(e);
}
currentReadOffset = currentReadOffset + Integer.BYTES + logBlockSize;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index e27fdb86c4..9287f3b7dd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -54,10 +54,14 @@ public class DirectorySnapshot extends Snapshot {
serializeBase(dataOutputStream);
try {
- dataOutputStream.writeBytes(directory.getAbsolutePath());
+ byte[] bytes = directory.getAbsolutePath().getBytes(StandardCharsets.UTF_8);
+ dataOutputStream.writeInt(bytes.length);
+ dataOutputStream.write(bytes);
dataOutputStream.writeInt(filePaths.size());
for (Path filePath : filePaths) {
- dataOutputStream.writeBytes(filePath.toString());
+ byte[] pathBytes = filePath.toString().getBytes(StandardCharsets.UTF_8);
+ dataOutputStream.writeInt(pathBytes.length);
+ dataOutputStream.write(pathBytes);
}
} catch (IOException e) {
// unreachable
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index fced0f868a..564a3a954d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -81,7 +81,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
throws UnknownLogTypeException {
ConfigChangeEntry configChangeEntry = null;
for (ByteBuffer entryBuffer : request.entries) {
- Entry entry = LogParser.getINSTANCE().parse(entryBuffer);
+ Entry entry = LogParser.getINSTANCE().parse(entryBuffer, null);
if (entry instanceof ConfigChangeEntry) {
configChangeEntry = (ConfigChangeEntry) entry;
break;
@@ -160,8 +160,6 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
decompressedRequest
.setTerm(request.getTerm())
.setLeader(request.leader)
- .setPrevLogIndex(request.prevLogIndex)
- .setPrevLogTerm(request.prevLogTerm)
.setLeaderCommit(request.leaderCommit)
.setGroupId(request.groupId)
.setLeaderId(request.leaderId);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index c536763f37..5c7e2d6659 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.utils;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.natraft.exception.UnknownLogTypeException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
@@ -62,21 +63,12 @@ public class LogUtils {
entry.setByteSize(byteBuffer.array().length);
request.entry = byteBuffer;
}
- try {
- if (entry.getPrevTerm() != -1) {
- request.setPrevLogTerm(entry.getPrevTerm());
- } else {
- request.setPrevLogTerm(member.getLogManager().getTerm(entry.getCurrLogIndex() - 1));
- }
- } catch (Exception e) {
- logger.error("getTerm failed for newly append entries", e);
- }
+
request.setLeader(member.getThisNode().getEndpoint());
request.setLeaderId(member.getThisNode().getNodeId());
// don't need lock because even if it's larger than the commitIndex when appending this log to
// logManager, the follower can handle the larger commitIndex with no effect
request.setLeaderCommit(member.getLogManager().getCommitLogIndex());
- request.setPrevLogIndex(entry.getCurrLogIndex() - 1);
request.setGroupId(member.getRaftGroupId().convertToTConsensusGroupId());
return request;
@@ -138,14 +130,16 @@ public class LogUtils {
return buffers;
}
- public static List<Entry> parseEntries(List<ByteBuffer> buffers) throws UnknownLogTypeException {
+ public static List<Entry> parseEntries(List<ByteBuffer> buffers, IStateMachine stateMachine)
+ throws UnknownLogTypeException {
List<Entry> entries = new ArrayList<>();
for (ByteBuffer buffer : buffers) {
buffer.mark();
Entry e;
try {
- e = LogParser.getINSTANCE().parse(buffer);
- e.setByteSize(buffer.limit() - buffer.position());
+ e = LogParser.getINSTANCE().parse(buffer, stateMachine);
+ buffer.reset();
+ e.setSerializationCache(buffer);
} catch (BufferUnderflowException ex) {
buffer.reset();
throw ex;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
index 3a35b9dab6..9631b892fd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -202,6 +202,24 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_WAIT_FOR_WINDOW(
+ RAFT_MEMBER_RECEIVER,
+ "receiver wait for window",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_WAIT_LOCK(
+ RAFT_MEMBER_RECEIVER,
+ "receiver wait for lock",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_INTERNAL(
+ RAFT_MEMBER_RECEIVER,
+ "append entry (internal)",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_RECEIVER_APPEND_ENTRY(
RAFT_MEMBER_RECEIVER,
"append entrys",
@@ -290,18 +308,36 @@ public class Timer {
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_GET_LOG_FOR_COMMIT(
+ LOG_DISPATCHER,
+ "get log for commit",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_SENDER_LOG_FROM_CREATE_TO_READY_COMMIT(
LOG_DISPATCHER,
"from create to ready commit",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_COMMIT_HOLD_LOCK(
+ LOG_DISPATCHER,
+ "commit hold lock",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
LOG_DISPATCHER,
"from create to committed",
TIME_SCALE,
true,
META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_APPLIER(
+ LOG_DISPATCHER,
+ "from create to applier",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
LOG_DISPATCHER,
"from create to applied",
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
index 7bbb716f14..dc2ee8e971 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/StatementGenerator.java
@@ -108,6 +108,7 @@ import java.util.Map;
/** Convert SQL and RPC requests to {@link Statement}. */
public class StatementGenerator {
+
// TODO @spricoder optimize the method adding metrics
public static Statement createStatement(String sql, ZoneId zoneId) {
return invokeParser(sql, zoneId);
@@ -278,13 +279,15 @@ public class StatementGenerator {
insertStatement.setDevicePath(new PartialPath(insertTabletReq.getPrefixPath()));
insertStatement.setMeasurements(insertTabletReq.getMeasurements().toArray(new String[0]));
insertStatement.setTimes(
- QueryDataSetUtils.readTimesFromBuffer(insertTabletReq.timestamps, insertTabletReq.size));
+ QueryDataSetUtils.readTimesFromBuffer(
+ insertTabletReq.timestamps, insertTabletReq.size, insertTabletReq.compression));
insertStatement.setColumns(
QueryDataSetUtils.readTabletValuesFromBuffer(
insertTabletReq.values,
insertTabletReq.types,
insertTabletReq.types.size(),
- insertTabletReq.size));
+ insertTabletReq.size,
+ insertTabletReq.compression));
insertStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
insertTabletReq.values, insertTabletReq.types.size(), insertTabletReq.size));
@@ -310,13 +313,15 @@ public class StatementGenerator {
insertTabletStatement.setDevicePath(new PartialPath(req.prefixPaths.get(i)));
insertTabletStatement.setMeasurements(req.measurementsList.get(i).toArray(new String[0]));
insertTabletStatement.setTimes(
- QueryDataSetUtils.readTimesFromBuffer(req.timestampsList.get(i), req.sizeList.get(i)));
+ QueryDataSetUtils.readTimesFromBuffer(
+ req.timestampsList.get(i), req.sizeList.get(i), req.compression));
insertTabletStatement.setColumns(
QueryDataSetUtils.readTabletValuesFromBuffer(
req.valuesList.get(i),
req.typesList.get(i),
req.measurementsList.get(i).size(),
- req.sizeList.get(i)));
+ req.sizeList.get(i),
+ req.compression));
insertTabletStatement.setBitMaps(
QueryDataSetUtils.readBitMapsFromBuffer(
req.valuesList.get(i), req.measurementsList.get(i).size(), req.sizeList.get(i)));
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
index c023792b95..19e157493f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertTabletNode.java
@@ -699,7 +699,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
rowCount = buffer.getInt();
times = new long[rowCount];
- times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+ times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount, null);
boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
if (hasBitMaps) {
@@ -969,7 +969,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue, ISche
rowCount = buffer.getInt();
times = new long[rowCount];
- times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount);
+ times = QueryDataSetUtils.readTimesFromBuffer(buffer, rowCount, null);
boolean hasBitMaps = BytesUtils.byteToBool(buffer.get());
if (hasBitMaps) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
index 6baee72738..5b21eb2fa3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.db.utils;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -271,7 +273,8 @@ public class QueryDataSetUtils {
return new Pair<>(res, !queryExecution.hasNextResult());
}
- public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) {
+ public static long[] readTimesFromBuffer(ByteBuffer buffer, int size, String compression) {
+ buffer = uncompressBuffer(buffer, compression);
long[] times = new long[size];
for (int i = 0; i < size; i++) {
times[i] = buffer.getLong();
@@ -324,8 +327,32 @@ public class QueryDataSetUtils {
return bitMaps;
}
+ public static ByteBuffer uncompressBuffer(ByteBuffer buffer, String compression) {
+ if (compression != null) {
+ IUnCompressor unCompressor =
+ IUnCompressor.getUnCompressor(CompressionType.valueOf(compression));
+ try {
+ int uncompressedLength =
+ unCompressor.getUncompressedLength(
+ buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
+ byte[] uncompressedBuffer = new byte[uncompressedLength];
+ unCompressor.uncompress(
+ buffer.array(),
+ buffer.arrayOffset() + buffer.position(),
+ buffer.remaining(),
+ uncompressedBuffer,
+ 0);
+ return ByteBuffer.wrap(uncompressedBuffer);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return buffer;
+ }
+
public static Object[] readTabletValuesFromBuffer(
- ByteBuffer buffer, List<Integer> types, int columns, int size) {
+ ByteBuffer buffer, List<Integer> types, int columns, int size, String compression) {
+ buffer = uncompressBuffer(buffer, compression);
TSDataType[] dataTypes = new TSDataType[types.size()];
for (int i = 0; i < dataTypes.length; i++) {
dataTypes[i] = TSDataType.values()[types.get(i)];
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
index 43adca20b2..02ec81bbbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/SetThreadName.java
@@ -20,24 +20,22 @@ package org.apache.iotdb.db.utils;
import java.io.Closeable;
-import static java.util.Objects.requireNonNull;
-
public class SetThreadName implements Closeable {
- private final String originalThreadName;
+ // private final String originalThreadName;
public SetThreadName(String suffix) {
- requireNonNull(suffix, "suffix is null");
- originalThreadName = Thread.currentThread().getName();
- int index = originalThreadName.indexOf("$");
- if (index < 0) {
- Thread.currentThread().setName(originalThreadName + '$' + suffix);
- } else {
- Thread.currentThread().setName(originalThreadName.substring(0, index) + '$' + suffix);
- }
+ // requireNonNull(suffix, "suffix is null");
+ // originalThreadName = Thread.currentThread().getName();
+ // int index = originalThreadName.indexOf("$");
+ // if (index < 0) {
+ // Thread.currentThread().setName(originalThreadName + '$' + suffix);
+ // } else {
+ // Thread.currentThread().setName(originalThreadName.substring(0, index) + '$' + suffix);
+ // }
}
@Override
public void close() {
- Thread.currentThread().setName(originalThreadName);
+ // Thread.currentThread().setName(originalThreadName);
}
}
diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java
index 5043ef89e4..d3edee11d5 100644
--- a/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -141,6 +141,7 @@ public class Session implements ISession {
// The version number of the client which used for compatibility in the server
protected Version version;
+ protected CompressionType compressionType = CompressionType.SNAPPY;
public Session(String host, int rpcPort) {
this(
@@ -2509,9 +2510,19 @@ public class Session implements ISession {
request.setPrefixPath(tablet.deviceId);
request.setIsAligned(isAligned);
- request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
- request.setValues(SessionUtils.getValueBuffer(tablet));
- request.setSize(tablet.rowSize);
+ try {
+ request.setTimestamps(SessionUtils.getTimeBuffer(tablet, compressionType));
+ request.setValues(SessionUtils.getValueBuffer(tablet, compressionType));
+ request.setSize(tablet.rowSize);
+ if (compressionType != CompressionType.UNCOMPRESSED) {
+ request.setCompression(compressionType.toString());
+ }
+ } catch (IOException e) {
+ request.setTimestamps(SessionUtils.getTimeBuffer(tablet));
+ request.setValues(SessionUtils.getValueBuffer(tablet));
+ request.setSize(tablet.rowSize);
+ }
+
return request;
}
diff --git a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
index 32ed19add9..0bbd3d0cb9 100644
--- a/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
+++ b/session/src/main/java/org/apache/iotdb/session/util/SessionUtils.java
@@ -21,7 +21,9 @@ package org.apache.iotdb.session.util;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.compress.ICompressor;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.tsfile.utils.BitMap;
@@ -33,6 +35,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -272,4 +275,24 @@ public class SessionUtils {
throw new NumberFormatException("NodeUrl Incorrect format");
}
}
+
+ public static byte[] getTimeBuffer(Tablet tablet, CompressionType compressionType)
+ throws IOException {
+ ByteBuffer timeBuffer = getTimeBuffer(tablet);
+ ICompressor compressor = ICompressor.getCompressor(compressionType);
+ return compressor.compress(
+ timeBuffer.array(),
+ timeBuffer.arrayOffset() + timeBuffer.position(),
+ timeBuffer.remaining());
+ }
+
+ public static byte[] getValueBuffer(Tablet tablet, CompressionType compressionType)
+ throws IOException {
+ ByteBuffer timeBuffer = getValueBuffer(tablet);
+ ICompressor compressor = ICompressor.getCompressor(compressionType);
+ return compressor.compress(
+ timeBuffer.array(),
+ timeBuffer.arrayOffset() + timeBuffer.position(),
+ timeBuffer.remaining());
+ }
}
diff --git a/thrift-raft/src/main/thrift/raft.thrift b/thrift-raft/src/main/thrift/raft.thrift
index 67466b2080..95eaeb441d 100644
--- a/thrift-raft/src/main/thrift/raft.thrift
+++ b/thrift-raft/src/main/thrift/raft.thrift
@@ -24,32 +24,27 @@ struct AppendEntriesRequest {
1: required i64 term // leader's
2: required common.TEndPoint leader
3: required list<binary> entries // data
- 4: required i64 prevLogIndex
- 5: required i64 prevLogTerm
- 6: required i64 leaderCommit
- 7: required common.TConsensusGroupId groupId
- 8: required i32 leaderId
+ 4: required i64 leaderCommit
+ 5: required common.TConsensusGroupId groupId
+ 6: required i32 leaderId
}
struct AppendCompressedEntriesRequest {
1: required i64 term // leader's
2: required common.TEndPoint leader
3: required binary entryBytes // data
- 4: required i64 prevLogIndex
- 5: required i64 prevLogTerm
- 6: required i64 leaderCommit
- 7: required common.TConsensusGroupId groupId
- 8: required i32 leaderId
- 9: required i8 compressionType
+ 4: required i64 leaderCommit
+ 5: required common.TConsensusGroupId groupId
+ 6: required i32 leaderId
+ 7: required i8 compressionType
}
struct AppendEntryResult {
1: required i64 status;
- 2: optional i64 lastLogTerm;
- 3: optional i64 lastLogIndex;
- 4: optional common.TConsensusGroupId groupId;
- 5: optional common.TEndPoint receiver;
- 6: optional i32 receiverId;
+ 2: optional i64 lastLogIndex;
+ 3: optional common.TConsensusGroupId groupId;
+ 4: optional common.TEndPoint receiver;
+ 5: optional i32 receiverId;
}
// leader -> follower
diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift
index 357589db82..e888554fb0 100644
--- a/thrift/src/main/thrift/client.thrift
+++ b/thrift/src/main/thrift/client.thrift
@@ -233,6 +233,7 @@ struct TSInsertTabletReq {
6: required list<i32> types
7: required i32 size
8: optional bool isAligned
+ 9: optional string compression
}
struct TSInsertTabletsReq {
@@ -244,6 +245,7 @@ struct TSInsertTabletsReq {
6: required list<list<i32>> typesList
7: required list<i32> sizeList
8: optional bool isAligned
+ 9: optional string compression
}
struct TSInsertRecordsReq {