You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/03/07 08:27:30 UTC
[iotdb] branch native_raft updated: fix issues
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new 7aca6e85c0 fix issues
7aca6e85c0 is described below
commit 7aca6e85c098d1b643e312ee36b7037409497e91
Author: Tian Jiang <jt...@163.com>
AuthorDate: Tue Mar 7 16:29:17 2023 +0800
fix issues
---
.../common/request/IConsensusRequest.java | 14 +
.../iotdb/consensus/natraft/RaftConsensus.java | 23 +-
.../natraft/client/AsyncRaftServiceClient.java | 3 +-
.../consensus/natraft/client/GenericHandler.java | 2 +-
.../natraft/client/SyncClientAdaptor.java | 3 +-
.../consensus/natraft/protocol/RaftConfig.java | 4 +-
.../consensus/natraft/protocol/RaftMember.java | 148 +++----
.../protocol/heartbeat/ElectionRespHandler.java | 12 +-
.../natraft/protocol/heartbeat/ElectionState.java | 30 +-
.../protocol/heartbeat/HeartbeatThread.java | 35 +-
.../consensus/natraft/protocol/log/Entry.java | 31 +-
.../natraft/protocol/log/VotingEntry.java | 8 +-
.../protocol/log/appender/BlockingLogAppender.java | 44 +-
.../log/appender/SlidingWindowLogAppender.java | 16 +-
.../protocol/log/applier/AsyncLogApplier.java | 71 ++--
.../natraft/protocol/log/applier/BaseApplier.java | 5 +
.../natraft/protocol/log/applier/LogApplier.java | 3 +
.../log/dispatch/AppendNodeEntryHandler.java | 7 +-
.../protocol/log/dispatch/LogDispatcher.java | 51 +--
.../protocol/log/dispatch/VotingLogList.java | 35 +-
.../protocol/log/logtype/ConfigChangeEntry.java | 5 +-
.../manager/DirectorySnapshotRaftLogManager.java | 2 +-
.../protocol/log/manager/RaftLogManager.java | 18 +-
.../log/sequencing/LogSequencerFactory.java | 1 -
.../log/sequencing/SynchronousSequencer.java | 4 +-
.../protocol/log/snapshot/DirectorySnapshot.java | 8 +-
.../natraft/protocol/log/snapshot/Snapshot.java | 8 +-
.../natraft/service/RaftRPCServiceProcessor.java | 12 +-
.../iotdb/consensus/natraft/utils/LogUtils.java | 11 +-
.../iotdb/consensus/natraft/utils/NodeUtils.java | 23 +-
.../iotdb/consensus/natraft/utils/StatusUtils.java | 2 -
.../iotdb/consensus/natraft/utils/Timer.java | 447 +++++++++++++++++++++
.../plan/planner/plan/node/write/InsertNode.java | 5 +
33 files changed, 808 insertions(+), 283 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index f2729471d5..f18fd7413d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -19,9 +19,12 @@
package org.apache.iotdb.consensus.common.request;
+import org.apache.iotdb.commons.path.PartialPath;
+
import java.nio.ByteBuffer;
public interface IConsensusRequest {
+
/**
* Serialize all the data to a ByteBuffer.
*
@@ -39,4 +42,15 @@ public interface IConsensusRequest {
default long estimateSize() {
return 0;
}
+
+ /**
+ * If two requests returns the same conflictKey or one of them returns null, they cannot be
+ * executed in parallel in the same region. Otherwise, the two requests with different
+ * conflictKeys can be executed in parallel.
+ *
+ * @return a conflict key identifying requests that cannot be executed in parallel.
+ */
+ default PartialPath conflictKey() {
+ return null;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 8c3231d21c..a07afb1e97 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -49,6 +49,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.Flow
import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -102,6 +103,12 @@ public class RaftConsensus implements IConsensus {
} catch (StartupException e) {
throw new IOException(e);
}
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ logger.info(Timer.Statistic.getReport());
+ }));
}
private void initAndRecover() throws IOException {
@@ -185,8 +192,8 @@ public class RaftConsensus implements IConsensus {
stateMachineMap.remove(groupId);
}
- public boolean createNewMemberIfAbsent(ConsensusGroupId groupId, Peer thisPeer,
- List<Peer> peers, List<Peer> newPeers) {
+ public boolean createNewMemberIfAbsent(
+ ConsensusGroupId groupId, Peer thisPeer, List<Peer> peers, List<Peer> newPeers) {
AtomicBoolean exist = new AtomicBoolean(true);
stateMachineMap.computeIfAbsent(
groupId,
@@ -199,8 +206,15 @@ public class RaftConsensus implements IConsensus {
}
RaftMember impl =
new RaftMember(
- path, config, thisPeer, peers, newPeers, groupId, registry.apply(groupId),
- clientManager, this::onMemberRemoved);
+ path,
+ config,
+ thisPeer,
+ peers,
+ newPeers,
+ groupId,
+ registry.apply(groupId),
+ clientManager,
+ this::onMemberRemoved);
impl.start();
return impl;
});
@@ -348,7 +362,6 @@ public class RaftConsensus implements IConsensus {
return stateMachineMap.get(groupId);
}
-
public int getThisNodeId() {
return thisNodeId;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
index 2366d0e886..43f70d2b04 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
@@ -92,7 +92,8 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
@Override
public void onError(Exception e) {
if (e.getCause() instanceof NoMemberException
- || e instanceof TApplicationException && e.getMessage().contains("No such member")) {
+ || e instanceof TApplicationException
+ && (e.getMessage() != null && e.getMessage().contains("No such member"))) {
logger.debug(e.getMessage());
___currentMethod = null;
returnSelf();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
index f5b3370ac7..d3f77dd23e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/GenericHandler.java
@@ -71,7 +71,7 @@ public class GenericHandler<T> implements AsyncMethodCallback<T> {
while (elapsedTime < timeout) {
if (result.get() == null && getException() == null) {
synchronized (result) {
- result.wait(1000);
+ result.wait(1);
}
} else {
break;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
index a5be4ae46a..82196bf9fa 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/SyncClientAdaptor.java
@@ -83,8 +83,7 @@ public class SyncClientAdaptor {
return matchTermHandler.getResult(config.getConnectionTimeoutInMS());
}
- public static TSStatus forceElection(
- AsyncRaftServiceClient client, ConsensusGroupId groupId)
+ public static TSStatus forceElection(AsyncRaftServiceClient client, ConsensusGroupId groupId)
throws TException, InterruptedException {
GenericHandler<TSStatus> matchTermHandler = new GenericHandler<>(client.getEndpoint());
client.forceElection(groupId.convertToTConsensusGroupId(), matchTermHandler);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
index b29d51c951..78fb556cbd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftConfig.java
@@ -31,14 +31,14 @@ import java.util.concurrent.TimeUnit;
public class RaftConfig {
- private boolean enableWeakAcceptance = true;
+ private boolean enableWeakAcceptance = false;
private int maxNumOfLogsInMem = 10000;
private int minNumOfLogsInMem = 1000;
private long maxMemorySizeForRaftLog = 512 * 1024 * 1024L;
private int logDeleteCheckIntervalSecond = 1;
private boolean enableRaftLogPersistence = true;
private int catchUpTimeoutMS = 60_000;
- private boolean useFollowerSlidingWindow = true;
+ private boolean useFollowerSlidingWindow = false;
private int uncommittedRaftLogNumForRejectThreshold = 10000;
private int heartbeatIntervalMs = 1000;
private int electionTimeoutMs = 20_000;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 9ac0f1ed86..77a199e515 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1,29 +1,27 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
-
-
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+
+
+*/
package org.apache.iotdb.consensus.natraft.protocol;
-import java.util.Collections;
-import java.util.function.Consumer;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
@@ -78,6 +76,7 @@ import org.apache.iotdb.consensus.natraft.utils.LogUtils;
import org.apache.iotdb.consensus.natraft.utils.NodeUtils;
import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -100,6 +99,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@@ -107,6 +107,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
public class RaftMember {
@@ -132,9 +133,7 @@ public class RaftMember {
*/
private final Object waitLeaderCondition = new Object();
- /**
- * the lock is to make sure that only one thread can apply snapshot at the same time
- */
+ /** the lock is to make sure that only one thread can apply snapshot at the same time */
private final Lock snapshotApplyLock = new ReentrantLock();
/**
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
@@ -143,10 +142,9 @@ public class RaftMember {
private final Object syncLock = new Object();
protected Peer thisNode;
- /**
- * the nodes that belong to the same raft group as thisNode.
- */
+ /** the nodes that belong to the same raft group as thisNode. */
protected volatile List<Peer> allNodes;
+
protected volatile List<Peer> newNodes;
protected ConsensusGroupId groupId;
@@ -155,9 +153,7 @@ public class RaftMember {
protected RaftStatus status = new RaftStatus();
- /**
- * the raft logs are all stored and maintained in the log manager
- */
+ /** the raft logs are all stored and maintained in the log manager */
protected RaftLogManager logManager;
protected HeartbeatThread heartbeatThread;
@@ -177,15 +173,12 @@ public class RaftMember {
protected IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
protected CatchUpManager catchUpManager;
- /**
- * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
- */
+ /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
- * logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
*/
private volatile LogDispatcher logDispatcher;
@@ -209,7 +202,6 @@ public class RaftMember {
Consumer<ConsensusGroupId> onRemove) {
this.config = config;
this.storageDir = storageDir;
- initConfig();
this.thisNode = thisNode;
this.allNodes = allNodes;
@@ -232,9 +224,7 @@ public class RaftMember {
this.stateMachine = stateMachine;
this.votingLogList = new VotingLogList(this);
- this.logAppender = appenderFactory.create(this, config);
- this.logSequencer = SEQUENCER_FACTORY.create(this, config);
- this.logDispatcher = new LogDispatcher(this, config);
+ votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
this.heartbeatReqHandler = new HeartbeatReqHandler(this);
this.electionReqHandler = new ElectionReqHandler(this);
this.logManager =
@@ -245,6 +235,9 @@ public class RaftMember {
stateMachine,
config,
this::examineUnappliedEntry);
+ this.logAppender = appenderFactory.create(this, config);
+ this.logSequencer = SEQUENCER_FACTORY.create(this, config);
+ this.logDispatcher = new LogDispatcher(this, config);
this.onRemove = onRemove;
initPeerMap();
@@ -282,7 +275,7 @@ public class RaftMember {
private void examineUnappliedEntry(List<Entry> entries) {
ConfigChangeEntry configChangeEntry = null;
for (Entry entry : entries) {
- if (entry instanceof ConfigChangeEntry) {
+ if (entry instanceof ConfigChangeEntry) {
configChangeEntry = (ConfigChangeEntry) entry;
}
}
@@ -362,10 +355,6 @@ public class RaftMember {
}
}
- private void initConfig() {
- votingLogList.setEnableWeakAcceptance(config.isEnableWeakAcceptance());
- }
-
public void initPeerMap() {
status.peerMap = new ConcurrentHashMap<>();
for (Peer peer : allNodes) {
@@ -509,9 +498,7 @@ public class RaftMember {
return appendEntriesInternal(request);
}
- /**
- * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
- */
+ /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -588,7 +575,7 @@ public class RaftMember {
private boolean checkLogSize(Entry entry) {
return !config.isEnableRaftLogPersistence()
- || entry.serialize().capacity() + Integer.BYTES < config.getRaftLogBufferSize();
+ || entry.estimateSize() < config.getRaftLogBufferSize();
}
public boolean isReadOnly() {
@@ -635,11 +622,12 @@ public class RaftMember {
} else if (request != null) {
return forwardRequest(request, leader.getEndpoint(), leader.getGroupId());
} else {
- return new TSStatus().setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
+ return new TSStatus()
+ .setCode(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())
.setRedirectNode(leader.getEndpoint());
}
}
- return StatusUtils.OK;
+ return null;
}
public TSStatus processRequest(IConsensusRequest request) {
@@ -648,12 +636,13 @@ public class RaftMember {
}
TSStatus tsStatus = ensureLeader(request);
- if (tsStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ if (tsStatus != null) {
return tsStatus;
}
logger.debug("{}: Processing request {}", name, request);
Entry entry = new RequestEntry(request);
+ entry.receiveTime = System.nanoTime();
// just like processPlanLocally,we need to check the size of log
if (!checkLogSize(entry)) {
@@ -665,6 +654,9 @@ public class RaftMember {
// assign term and index to the new log and append it
VotingEntry votingEntry = logSequencer.sequence(entry);
+ entry.createTime = System.nanoTime();
+ Statistic.LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE.add(entry.createTime - entry.receiveTime);
+
if (config.isUseFollowerLoadBalance()) {
FlowMonitorManager.INSTANCE.report(thisNode, entry.estimateSize());
}
@@ -690,6 +682,9 @@ public class RaftMember {
}
}
}
+ entry.waitEndTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END.add(
+ entry.waitEndTime - entry.createTime);
if (entry.getException() != null) {
throw new LogExecutionException(entry.getException());
}
@@ -697,14 +692,20 @@ public class RaftMember {
private TSStatus includeLogNumbersInStatus(TSStatus status, Entry entry) {
return status.setMessage(
- getRaftGroupId() + "-" + entry.getCurrLogIndex() + "-" + entry.getCurrLogTerm());
+ getRaftGroupId().getType().ordinal()
+ + "-"
+ + getRaftGroupId().getId()
+ + "-"
+ + entry.getCurrLogIndex()
+ + "-"
+ + entry.getCurrLogTerm());
}
protected AppendLogResult waitAppendResult(VotingEntry votingEntry) {
// wait for the followers to vote
AcceptedType acceptedType = votingLogList.computeAcceptedType(votingEntry);
- if (votingLogList.computeAcceptedType(votingEntry) == AcceptedType.NOT_ACCEPTED) {
+ if (acceptedType == AcceptedType.NOT_ACCEPTED) {
acceptedType = waitAppendResultLoop(votingEntry);
}
@@ -760,15 +761,16 @@ public class RaftMember {
}
long waitTime = 1;
AcceptedType acceptedType = votingLogList.computeAcceptedType(log);
- synchronized (log.getEntry()) {
- while (votingLogList.computeAcceptedType(log) == AcceptedType.NOT_ACCEPTED
+ synchronized (log) {
+ while (acceptedType == AcceptedType.NOT_ACCEPTED
&& alreadyWait < config.getWriteOperationTimeoutMS()) {
try {
- log.getEntry().wait(waitTime);
+ log.wait(waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Unexpected interruption when sending a log", e);
}
+ acceptedType = votingLogList.computeAcceptedType(log);
waitTime = waitTime * 2;
alreadyWait = (System.nanoTime() - waitStart) / 1000000;
@@ -851,7 +853,7 @@ public class RaftMember {
* it.
*
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected void waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -882,7 +884,7 @@ public class RaftMember {
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enabled, when log differ too much, return false directly.
+ * @param fastFail if enabled, when log differ too much, return false directly.
*/
public void syncLocalApply(long leaderCommitId, boolean fastFail) {
long startTime = System.currentTimeMillis();
@@ -927,9 +929,7 @@ public class RaftMember {
waitedTime);
}
- /**
- * Wait until the leader of this node becomes known or time out.
- */
+ /** Wait until the leader of this node becomes known or time out. */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (status.leader.get() == null || status.leader.get() == null) {
@@ -966,9 +966,7 @@ public class RaftMember {
return handler.getResult(config.getConnectionTimeoutInMS());
}
- /**
- * @return true if there is a log whose index is "index" and term is "term", false otherwise
- */
+ /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -978,10 +976,10 @@ public class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param groupId must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
@@ -1002,13 +1000,14 @@ public class RaftMember {
this.status.leader.set(null);
waitLeader();
}
+ status.setRedirectNode(node);
return status;
}
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param groupId to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
@@ -1027,10 +1026,10 @@ public class RaftMember {
public TSStatus forwardPlanAsync(
IConsensusRequest request,
TEndPoint receiver,
- ConsensusGroupId header,
+ ConsensusGroupId groupId,
AsyncRaftServiceClient client) {
try {
- TSStatus tsStatus = SyncClientAdaptor.executeRequest(client, request, header, receiver);
+ TSStatus tsStatus = SyncClientAdaptor.executeRequest(client, request, groupId, receiver);
if (tsStatus == null) {
tsStatus = StatusUtils.TIME_OUT;
logger.warn(MSG_FORWARD_TIMEOUT, name, request, receiver);
@@ -1222,8 +1221,8 @@ public class RaftMember {
try {
logManager.getLock().writeLock().lock();
if (this.newNodes != null) {
- return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode()).setMessage(
- "Last configuration change in progress");
+ return new TSStatus(TSStatusCode.CONFIGURATION_ERROR.getStatusCode())
+ .setMessage("Last configuration change in progress");
}
ConfigChangeEntry e = new ConfigChangeEntry(oldNodes, newNodes);
Entry lastEntry = logManager.getLastEntry();
@@ -1254,8 +1253,9 @@ public class RaftMember {
private TSStatus waitForEntryResult(VotingEntry votingEntry) {
try {
- AppendLogResult appendLogResult =
- waitAppendResult(votingEntry);
+ AppendLogResult appendLogResult = waitAppendResult(votingEntry);
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END.add(
+ System.nanoTime() - votingEntry.getEntry().createTime);
switch (appendLogResult) {
case WEAK_ACCEPT:
return includeLogNumbersInStatus(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
index 301cb19ad4..1f3ba68695 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
@@ -28,8 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_LEADER_STILL_ONLINE;
@@ -50,10 +48,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
private ElectionState electionState;
public ElectionRespHandler(
- RaftMember raftMember,
- Peer voter,
- long currTerm,
- ElectionState electionState) {
+ RaftMember raftMember, Peer voter, long currTerm, ElectionState electionState) {
this.raftMember = raftMember;
this.voter = voter;
this.currTerm = currTerm;
@@ -71,10 +66,7 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
if (voterResp == RESPONSE_AGREE) {
electionState.onAccept(voter);
- logger.info(
- "{}: Received a grant vote from {}",
- memberName,
- voter);
+ logger.info("{}: Received a grant vote from {}", memberName, voter);
if (electionState.isAccepted()) {
// the election is valid
logger.info("{}: Election {} is won", memberName, currTerm);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
index 407252fe2c..bcb2a656b4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionState.java
@@ -1,9 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
+import org.apache.iotdb.consensus.common.Peer;
+
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.iotdb.consensus.common.Peer;
public class ElectionState {
private List<Peer> currNodes;
@@ -31,8 +51,8 @@ public class ElectionState {
if (newNodes != null && newNodes.contains(node)) {
acceptedNewNodes.add(node);
}
- if (acceptedCurrNodes.size() >= currNodes.size() / 2 &&
- (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
+ if (acceptedCurrNodes.size() >= currNodes.size() / 2
+ && (newNodes == null || (acceptedNewNodes.size() >= newNodes.size() / 2))) {
accepted = true;
synchronized (this) {
this.notifyAll();
@@ -47,8 +67,8 @@ public class ElectionState {
if (newNodes != null && newNodes.contains(node)) {
rejectedNewNodes.add(node);
}
- if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1 &&
- (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
+ if (rejectedCurrNodes.size() >= currNodes.size() / 2 + 1
+ && (newNodes == null || (rejectedNewNodes.size() >= newNodes.size() / 2 + 1))) {
rejected = true;
synchronized (this) {
this.notifyAll();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
index 5939f1c577..660e6feff6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatThread.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
-import java.util.Collections;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -37,8 +36,6 @@ import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* HeartbeatThread takes the responsibility to send heartbeats (when this node is a leader), check
@@ -168,9 +165,7 @@ public class HeartbeatThread implements Runnable {
logger.info("{}: End elections", memberName);
}
- /**
- * Send each node (except the local node) in the group of the member a heartbeat.
- */
+ /** Send each node (except the local node) in the group of the member a heartbeat. */
protected void sendHeartbeats() {
try {
localMember.getLogManager().getLock().readLock().lock();
@@ -186,9 +181,7 @@ public class HeartbeatThread implements Runnable {
sendHeartbeats(localMember.getAllNodes());
}
- /**
- * Send each node (except the local node) in list a heartbeat.
- */
+ /** Send each node (except the local node) in list a heartbeat. */
@SuppressWarnings("java:S2445")
private void sendHeartbeats(Collection<Peer> nodes) {
logger.debug(
@@ -298,13 +291,10 @@ public class HeartbeatThread implements Runnable {
electionRequest.setLastLogIndex(localMember.getLogManager().getLastLogIndex());
electionRequest.setGroupId(localMember.getRaftGroupId().convertToTConsensusGroupId());
- ElectionState electionState = new ElectionState(localMember.getAllNodes(),
- localMember.getNewNodes());
+ ElectionState electionState =
+ new ElectionState(localMember.getAllNodes(), localMember.getNewNodes());
- requestVote(
- electionState,
- electionRequest,
- nextTerm);
+ requestVote(electionState, electionRequest, nextTerm);
try {
logger.info(
@@ -333,13 +323,10 @@ public class HeartbeatThread implements Runnable {
* Any against vote will set the flag "electionTerminated" to true and ends the election.
*/
@SuppressWarnings("java:S2445")
- private void requestVote(
- ElectionState electionState,
- ElectionRequest request,
- long nextTerm) {
+ private void requestVote(ElectionState electionState, ElectionRequest request, long nextTerm) {
- Collection<Peer> peers = NodeUtils.unionNodes(electionState.getCurrNodes(),
- electionState.getNewNodes());
+ Collection<Peer> peers =
+ NodeUtils.unionNodes(electionState.getCurrNodes(), electionState.getNewNodes());
// avoid concurrent modification
for (Peer node : peers) {
if (node.equals(localMember.getThisNode())) {
@@ -347,11 +334,7 @@ public class HeartbeatThread implements Runnable {
}
ElectionRespHandler handler =
- new ElectionRespHandler(
- localMember,
- node,
- nextTerm,
- electionState);
+ new ElectionRespHandler(localMember, node, nextTerm, electionState);
requestVoteAsync(node, handler, request);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
index 1748518078..fec0bc7035 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/Entry.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.consensus.natraft.protocol.log;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.Objects;
@@ -45,6 +47,14 @@ public abstract class Entry implements Comparable<Entry> {
private volatile Exception exception;
private long byteSize = 0;
+ private boolean fromThisNode = false;
+
+ public long receiveTime;
+ public long createTime;
+ public long acceptedTime;
+ public long committedTime;
+ public long applyTime;
+ public long waitEndTime;
public int getDefaultSerializationBufferSize() {
return DEFAULT_SERIALIZATION_BUFFER_SIZE;
@@ -87,9 +97,16 @@ public abstract class Entry implements Comparable<Entry> {
}
public void setApplied(boolean applied) {
- synchronized (this) {
- this.applied = applied;
- this.notifyAll();
+ this.applied = applied;
+
+ if (createTime != 0) {
+ applyTime = System.nanoTime();
+ Statistic.LOG_DISPATCHER_FROM_CREATE_TO_APPLIED.add(applyTime - createTime);
+ }
+ if (fromThisNode) {
+ synchronized (this) {
+ this.notifyAll();
+ }
}
}
@@ -142,4 +159,12 @@ public abstract class Entry implements Comparable<Entry> {
public void setPrevTerm(long prevTerm) {
this.prevTerm = prevTerm;
}
+
+ public boolean isFromThisNode() {
+ return fromThisNode;
+ }
+
+ public void setFromThisNode(boolean fromThisNode) {
+ this.fromThisNode = fromThisNode;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
index 30be376f74..0d6cf613bf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/VotingEntry.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log;
-import java.util.List;
-import java.util.Map;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
@@ -28,6 +26,8 @@ import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
@@ -51,8 +51,8 @@ public class VotingEntry {
RaftConfig config) {
this.entry = entry;
if (config.isUseFollowerSlidingWindow()) {
- weaklyAcceptedNodes = new HashSet<>(
- currNodes.size() + (newNodes != null ? newNodes.size() : 0));
+ weaklyAcceptedNodes =
+ new HashSet<>(currNodes.size() + (newNodes != null ? newNodes.size() : 0));
}
this.setAppendEntryRequest(appendEntryRequest);
this.currNodes = currNodes;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index d8b3ae16f3..ed4cb9bff0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -61,7 +61,7 @@ public class BlockingLogAppender implements LogAppender {
* and append "log" to it. Otherwise report a log mismatch.
*
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
public AppendEntryResult appendEntry(AppendEntryRequest request, Entry log) {
long resp = checkPrevLogIndex(request.prevLogIndex);
@@ -79,10 +79,9 @@ public class BlockingLogAppender implements LogAppender {
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
success =
logManager.maybeAppend(
- request.prevLogIndex,
- request.prevLogTerm,
- request.leaderCommit,
- Collections.singletonList(log));
+ request.prevLogIndex, request.prevLogTerm, Collections.singletonList(log));
+ member.tryUpdateCommitIndex(
+ request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
break;
}
try {
@@ -107,9 +106,7 @@ public class BlockingLogAppender implements LogAppender {
return result;
}
- /**
- * Wait until all logs before "prevLogIndex" arrive or a timeout is reached.
- */
+ /** Wait until all logs before "prevLogIndex" arrive or a timeout is reached. */
private boolean waitForPrevLog(long prevLogIndex) {
long waitStart = System.currentTimeMillis();
long alreadyWait = 0;
@@ -154,7 +151,7 @@ public class BlockingLogAppender implements LogAppender {
*
* @param logs append logs
* @return Response.RESPONSE_AGREE when the log is successfully appended or Response
- * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
+ * .RESPONSE_LOG_MISMATCH if the previous log of "log" is not found.
*/
public AppendEntryResult appendEntries(AppendEntriesRequest request, List<Entry> logs) {
logger.debug(
@@ -186,8 +183,10 @@ public class BlockingLogAppender implements LogAppender {
while (true) {
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- resp = lastConfigEntry == null ? appendWithoutConfigChange(request, logs, result)
- : appendWithConfigChange(request, logs, result, lastConfigEntry);
+ resp =
+ lastConfigEntry == null
+ ? appendWithoutConfigChange(request, logs, result)
+ : appendWithConfigChange(request, logs, result, lastConfigEntry);
break;
}
@@ -206,14 +205,15 @@ public class BlockingLogAppender implements LogAppender {
return result;
}
- protected long appendWithConfigChange(AppendEntriesRequest request, List<Entry> logs,
- AppendEntryResult result, ConfigChangeEntry configChangeEntry) {
+ protected long appendWithConfigChange(
+ AppendEntriesRequest request,
+ List<Entry> logs,
+ AppendEntryResult result,
+ ConfigChangeEntry configChangeEntry) {
long resp;
try {
logManager.getLock().writeLock().lock();
- resp =
- logManager.maybeAppend(
- request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+ resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
@@ -228,6 +228,8 @@ public class BlockingLogAppender implements LogAppender {
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
member.setNewNodes(configChangeEntry.getNewPeers());
+ member.tryUpdateCommitIndex(
+ request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
} else {
// the incoming log points to an illegal position, reject it
result.status = Response.RESPONSE_LOG_MISMATCH;
@@ -238,11 +240,9 @@ public class BlockingLogAppender implements LogAppender {
return resp;
}
- protected long appendWithoutConfigChange(AppendEntriesRequest request, List<Entry> logs,
- AppendEntryResult result) {
- long resp =
- logManager.maybeAppend(
- request.prevLogIndex, request.prevLogTerm, request.leaderCommit, logs);
+ protected long appendWithoutConfigChange(
+ AppendEntriesRequest request, List<Entry> logs, AppendEntryResult result) {
+ long resp = logManager.maybeAppend(request.prevLogIndex, request.prevLogTerm, logs);
if (resp != -1) {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -254,6 +254,8 @@ public class BlockingLogAppender implements LogAppender {
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
+ member.tryUpdateCommitIndex(
+ request.getTerm(), request.leaderCommit, logManager.getTerm(request.leaderCommit));
} else {
// the incoming log points to an illegal position, reject it
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index 4572895c49..ab89f354cf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -138,11 +138,10 @@ public class SlidingWindowLogAppender implements LogAppender {
// TODO: Consider memory footprint to execute a precise rejection
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
<= config.getUnAppliedRaftLogNumForRejectThreshold()) {
- synchronized (logManager) {
- success =
- logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
- break;
- }
+ success = logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, logs);
+ member.tryUpdateCommitIndex(
+ member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
+ break;
}
try {
TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
@@ -211,13 +210,14 @@ public class SlidingWindowLogAppender implements LogAppender {
long appendedPos = 0;
AppendEntryResult result = new AppendEntryResult();
- synchronized (logManager) {
+ synchronized (this) {
int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
if (windowPos < 0) {
// the new entry may replace an appended entry
appendedPos =
- logManager.maybeAppend(
- prevLogIndex, prevLogTerm, leaderCommit, Collections.singletonList(entry));
+ logManager.maybeAppend(prevLogIndex, prevLogTerm, Collections.singletonList(entry));
+ member.tryUpdateCommitIndex(
+ member.getStatus().getTerm().get(), leaderCommit, logManager.getTerm(leaderCommit));
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
index 3f988e78f4..14745a510d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/AsyncLogApplier.java
@@ -21,28 +21,26 @@ package org.apache.iotdb.consensus.natraft.protocol.log.applier;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.IStateMachine;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.function.Consumer;
public class AsyncLogApplier implements LogApplier {
private static final Logger logger = LoggerFactory.getLogger(AsyncLogApplier.class);
- private static final int CONCURRENT_CONSUMER_NUM = Runtime.getRuntime().availableProcessors();
- private RaftConfig config;
+ private static final int CONCURRENT_CONSUMER_NUM = 4;
private LogApplier embeddedApplier;
- private Map<PartialPath, DataLogConsumer> consumerMap;
+ private DataLogConsumer[] consumers;
private ExecutorService consumerPool;
private String name;
@@ -54,11 +52,14 @@ public class AsyncLogApplier implements LogApplier {
public AsyncLogApplier(LogApplier embeddedApplier, String name, RaftConfig config) {
this.embeddedApplier = embeddedApplier;
- consumerMap = new HashMap<>();
+ consumers = new DataLogConsumer[CONCURRENT_CONSUMER_NUM];
consumerPool =
IoTDBThreadPoolFactory.newFixedThreadPool(CONCURRENT_CONSUMER_NUM, "ApplierThread");
+ for (int i = 0; i < consumers.length; i++) {
+ consumers[i] = new DataLogConsumer(name + "-" + i, config.getMaxNumOfLogsInMem());
+ consumerPool.submit(consumers[i]);
+ }
this.name = name;
- this.config = config;
}
@Override
@@ -71,12 +72,18 @@ public class AsyncLogApplier implements LogApplier {
// the consumers will never be drained
public synchronized void apply(Entry e) {
- PartialPath logKey = getLogKey(e);
-
- if (logKey != null) {
- // this plan only affects one sg, so we can run it with other plans in parallel
- provideLogToConsumers(logKey, e);
- return;
+ if (e instanceof RequestEntry) {
+ RequestEntry requestEntry = (RequestEntry) e;
+ IConsensusRequest request = requestEntry.getRequest();
+ request = getStateMachine().deserializeRequest(request);
+ requestEntry.setRequest(request);
+
+ PartialPath logKey = getLogKey(request);
+ if (logKey != null) {
+ // this plan only affects one sg, so we can run it with other plans in parallel
+ provideLogToConsumers(logKey, e);
+ return;
+ }
}
logger.debug("{}: {} is waiting for consumers to drain", name, e);
@@ -84,13 +91,12 @@ public class AsyncLogApplier implements LogApplier {
applyInternal(e);
}
- private PartialPath getLogKey(Entry e) {
- // TODO-raft: implement
- return null;
+ private PartialPath getLogKey(IConsensusRequest e) {
+ return e.conflictKey();
}
private void provideLogToConsumers(PartialPath planKey, Entry e) {
- consumerMap.computeIfAbsent(planKey, d -> new DataLogConsumer(name + "-" + d)).accept(e);
+ consumers[Math.abs(planKey.hashCode()) % CONCURRENT_CONSUMER_NUM].accept(e);
}
private void drainConsumers() {
@@ -108,7 +114,7 @@ public class AsyncLogApplier implements LogApplier {
}
private boolean allConsumersEmpty() {
- for (DataLogConsumer consumer : consumerMap.values()) {
+ for (DataLogConsumer consumer : consumers) {
if (!consumer.isEmpty()) {
if (logger.isDebugEnabled()) {
logger.debug("Consumer not empty: {}", consumer);
@@ -125,14 +131,14 @@ public class AsyncLogApplier implements LogApplier {
private class DataLogConsumer implements Runnable, Consumer<Entry> {
- private BlockingQueue<Entry> logQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
+ private BlockingQueue<Entry> logQueue;
private volatile long lastLogIndex;
private volatile long lastAppliedLogIndex;
private String name;
- private Future<?> future;
- public DataLogConsumer(String name) {
+ public DataLogConsumer(String name, int queueCapacity) {
this.name = name;
+ this.logQueue = new ArrayBlockingQueue<>(queueCapacity);
}
public boolean isEmpty() {
@@ -174,20 +180,6 @@ public class AsyncLogApplier implements LogApplier {
@Override
public void accept(Entry e) {
- if (future == null || future.isCancelled() || future.isDone()) {
- if (future != null) {
- try {
- future.get();
- } catch (InterruptedException ex) {
- logger.error("Last applier thread exits unexpectedly", ex);
- Thread.currentThread().interrupt();
- } catch (ExecutionException ex) {
- logger.error("Last applier thread exits unexpectedly", ex);
- }
- }
- future = consumerPool.submit(this);
- }
-
try {
lastLogIndex = e.getCurrLogIndex();
logQueue.put(e);
@@ -214,4 +206,9 @@ public class AsyncLogApplier implements LogApplier {
+ '}';
}
}
+
+ @Override
+ public IStateMachine getStateMachine() {
+ return embeddedApplier.getStateMachine();
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
index 646159877f..c38e5fc260 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/BaseApplier.java
@@ -71,4 +71,9 @@ public class BaseApplier implements LogApplier {
request = stateMachine.deserializeRequest(request);
return stateMachine.write(request);
}
+
+ @Override
+ public IStateMachine getStateMachine() {
+ return stateMachine;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
index e2ddfdacf0..c2e0821502 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/applier/LogApplier.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.consensus.natraft.protocol.log.applier;
+import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
/** LogApplier applies the log to the local node to make it take effect. */
@@ -34,4 +35,6 @@ public interface LogApplier {
void apply(Entry e);
default void close() {}
+
+ IStateMachine getStateMachine();
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 7b5f9f5dc2..4d90e0d9b2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -69,10 +69,7 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
long resp = response.status;
if (resp == RESPONSE_STRONG_ACCEPT || resp == RESPONSE_AGREE) {
- member
- .getVotingLogList()
- .onStronglyAccept(
- log, trueReceiver);
+ member.getVotingLogList().onStronglyAccept(log, trueReceiver);
member.getStatus().getPeerMap().get(trueReceiver).setMatchIndex(response.lastLogIndex);
} else if (resp > 0) {
@@ -136,7 +133,6 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
}
}
-
public void setLog(VotingEntry log) {
this.log = log;
}
@@ -148,5 +144,4 @@ public class AppendNodeEntryHandler implements AsyncMethodCallback<AppendEntryRe
public void setDirectReceiver(Peer follower) {
this.directReceiver = follower;
}
-
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 07f3bc3929..27ea860aad 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -19,11 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.utils.TestOnly;
@@ -38,13 +33,13 @@ import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import com.google.common.util.concurrent.RateLimiter;
-import org.apache.thrift.TException;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -55,6 +50,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import static org.apache.iotdb.consensus.natraft.utils.NodeUtils.unionNodes;
+
/**
* A LogDispatcher serves a raft leader by queuing logs that the leader wants to send to its
* followers and send the logs in an ordered manner so that the followers will not wait for previous
@@ -101,9 +98,6 @@ public class LogDispatcher {
}
}
-
-
-
void createQueue(Peer node) {
BlockingQueue<VotingEntry> logBlockingQueue;
logBlockingQueue = new ArrayBlockingQueue<>(config.getMaxNumOfLogsInMem());
@@ -112,9 +106,7 @@ public class LogDispatcher {
for (int i = 0; i < bindingThreadNum; i++) {
executorServices
- .computeIfAbsent(
- node,
- n -> createPool(node))
+ .computeIfAbsent(node, n -> createPool(node))
.submit(newDispatcherThread(node, logBlockingQueue));
}
}
@@ -134,7 +126,7 @@ public class LogDispatcher {
void createQueueAndBindingThreads(Collection<Peer> peers) {
for (Peer node : peers) {
if (!node.equals(member.getThisNode())) {
- createQueue(node);
+ createQueue(node);
}
}
updateRateLimiter();
@@ -154,7 +146,13 @@ public class LogDispatcher {
}
protected boolean addToQueue(BlockingQueue<VotingEntry> nodeLogQueue, VotingEntry request) {
- return nodeLogQueue.add(request);
+ synchronized (nodeLogQueue) {
+ boolean added = nodeLogQueue.add(request);
+ if (added) {
+ nodeLogQueue.notifyAll();
+ }
+ return added;
+ }
}
public void offer(VotingEntry request) {
@@ -228,12 +226,13 @@ public class LogDispatcher {
try {
while (!Thread.interrupted()) {
synchronized (logBlockingDeque) {
- VotingEntry poll = logBlockingDeque.take();
- currBatch.add(poll);
- if (maxBatchSize > 1) {
- while (!logBlockingDeque.isEmpty() && currBatch.size() < maxBatchSize) {
- currBatch.add(logBlockingDeque.take());
- }
+ VotingEntry poll = logBlockingDeque.poll();
+ if (poll != null) {
+ currBatch.add(poll);
+ logBlockingDeque.drainTo(currBatch, maxBatchSize);
+ } else {
+ logBlockingDeque.wait(10);
+ continue;
}
}
if (logger.isDebugEnabled()) {
@@ -300,7 +299,11 @@ public class LogDispatcher {
return request;
}
- private void sendLogs(List<VotingEntry> currBatch) throws TException {
+ private void sendLogs(List<VotingEntry> currBatch) {
+ if (currBatch.isEmpty()) {
+ return;
+ }
+
int logIndex = 0;
logger.debug(
"send logs from index {} to {}",
@@ -331,8 +334,7 @@ public class LogDispatcher {
}
}
- public AppendNodeEntryHandler getAppendNodeEntryHandler(
- VotingEntry log, Peer node) {
+ public AppendNodeEntryHandler getAppendNodeEntryHandler(VotingEntry log, Peer node) {
AppendNodeEntryHandler handler = new AppendNodeEntryHandler();
handler.setDirectReceiver(node);
handler.setLog(log);
@@ -347,8 +349,7 @@ public class LogDispatcher {
private AppendEntriesHandler(List<VotingEntry> batch) {
singleEntryHandlers = new ArrayList<>(batch.size());
for (VotingEntry sendLogRequest : batch) {
- AppendNodeEntryHandler handler =
- getAppendNodeEntryHandler(sendLogRequest, receiver);
+ AppendNodeEntryHandler handler = getAppendNodeEntryHandler(sendLogRequest, receiver);
singleEntryHandlers.add(handler);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
index 7a097e1f4d..15f845abbe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/VotingLogList.java
@@ -19,17 +19,20 @@
package org.apache.iotdb.consensus.natraft.protocol.log.dispatch;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.exception.LogExecutionException;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
public class VotingLogList {
private static final Logger logger = LoggerFactory.getLogger(VotingLogList.class);
@@ -80,7 +83,8 @@ public class VotingLogList {
* @return the lastly removed entry if any.
*/
public void onStronglyAccept(VotingEntry entry, Peer acceptingNode) {
- logger.debug("{} is strongly accepted by {}", entry, acceptingNode);
+ logger.debug(
+ "{} is strongly accepted by {}; {}", entry, acceptingNode, stronglyAcceptedIndices);
long currLogIndex = entry.getEntry().getCurrLogIndex();
Long newIndex =
@@ -96,6 +100,9 @@ public class VotingLogList {
return oldIndex;
}
});
+ entry.getEntry().acceptedTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT.add(
+ entry.getEntry().acceptedTime - entry.getEntry().createTime);
if (newIndex == currLogIndex) {
tryCommit(entry);
}
@@ -119,16 +126,16 @@ public class VotingLogList {
if (enableWeakAcceptance) {
int currNodeQuorumNum = votingEntry.currNodesQuorumNum();
int newNodeQuorumNum = votingEntry.newNodesQuorumNum();
- int stronglyAcceptedNumByCurrNodes = votingEntry.stronglyAcceptedNumByCurrNodes(
- stronglyAcceptedIndices);
- int stronglyAcceptedNumByNewNodes = votingEntry.stronglyAcceptedNumByNewNodes(
- stronglyAcceptedIndices);
- int weaklyAcceptedNumByCurrNodes = votingEntry.weaklyAcceptedNumByCurrNodes(
- stronglyAcceptedIndices);
- int weaklyAcceptedNumByNewNodes = votingEntry.weaklyAcceptedNumByNewNodes(
- stronglyAcceptedIndices);
- if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum &&
- (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
+ int stronglyAcceptedNumByCurrNodes =
+ votingEntry.stronglyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+ int stronglyAcceptedNumByNewNodes =
+ votingEntry.stronglyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+ int weaklyAcceptedNumByCurrNodes =
+ votingEntry.weaklyAcceptedNumByCurrNodes(stronglyAcceptedIndices);
+ int weaklyAcceptedNumByNewNodes =
+ votingEntry.weaklyAcceptedNumByNewNodes(stronglyAcceptedIndices);
+ if ((weaklyAcceptedNumByCurrNodes + stronglyAcceptedNumByCurrNodes) >= currNodeQuorumNum
+ && (weaklyAcceptedNumByNewNodes + stronglyAcceptedNumByNewNodes) >= newNodeQuorumNum) {
return AcceptedType.WEAKLY_ACCEPTED;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
index 6cecfa1b9e..1e59fccc49 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/ConfigChangeEntry.java
@@ -19,14 +19,15 @@
package org.apache.iotdb.consensus.natraft.protocol.log.logtype;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
+
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
public class ConfigChangeEntry extends Entry {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index befe8ef655..a76653dac4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
-import java.util.function.Consumer;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
@@ -34,6 +33,7 @@ import org.apache.iotdb.consensus.natraft.utils.IOUtils;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
+import java.util.function.Consumer;
public class DirectorySnapshotRaftLogManager extends RaftLogManager {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index 3fdcf963d2..804328305d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
-import java.util.function.Consumer;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.IStateMachine;
@@ -33,6 +32,7 @@ import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.LogManagerMeta;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
+import org.apache.iotdb.consensus.natraft.utils.Timer.Statistic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
public abstract class RaftLogManager {
@@ -330,12 +331,11 @@ public abstract class RaftLogManager {
*
* @param lastIndex leader's matchIndex for this follower node
* @param lastTerm the entry's term which index is leader's matchIndex for this follower node
- * @param leaderCommit leader's commitIndex
* @param entries entries sent from the leader node Note that the leader must ensure
* entries[0].index = lastIndex + 1
* @return -1 if the entries cannot be appended, otherwise the last index of new entries
*/
- public long maybeAppend(long lastIndex, long lastTerm, long leaderCommit, List<Entry> entries) {
+ public long maybeAppend(long lastIndex, long lastTerm, List<Entry> entries) {
try {
lock.writeLock().lock();
if (matchTerm(lastTerm, lastIndex)) {
@@ -362,11 +362,6 @@ public abstract class RaftLogManager {
long offset = lastIndex + 1;
append(entries.subList((int) (ci - offset), entries.size()));
}
- try {
- commitTo(Math.min(leaderCommit, newLastIndex));
- } catch (LogExecutionException e) {
- // exceptions are ignored on follower side
- }
return newLastIndex;
}
return -1;
@@ -590,6 +585,13 @@ public abstract class RaftLogManager {
// Cluster could continue provide service when exception is thrown here
getStableEntryManager().append(entries, appliedIndex);
}
+ for (Entry entry : entries) {
+ if (entry.createTime != 0) {
+ entry.committedTime = System.nanoTime();
+ Statistic.RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT.add(
+ entry.committedTime - entry.createTime);
+ }
+ }
} catch (IOException e) {
// The exception will block the raft service continue accept log.
// TODO: Notify user that the persisted logs before these entries(include) are corrupted.
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
index ed65209947..18f12ec8fe 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/LogSequencerFactory.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.consensus.natraft.protocol.log.sequencing;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
public interface LogSequencerFactory {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
index aaba4ae2c6..624bf0af85 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/sequencing/SynchronousSequencer.java
@@ -47,8 +47,6 @@ public class SynchronousSequencer implements LogSequencer {
this.config = config;
}
-
-
@Override
public VotingEntry sequence(Entry e) {
VotingEntry votingEntry = null;
@@ -68,6 +66,7 @@ public class SynchronousSequencer implements LogSequencer {
e.setCurrLogTerm(member.getStatus().getTerm().get());
e.setCurrLogIndex(lastIndex + 1);
e.setPrevTerm(lastTerm);
+ e.setFromThisNode(true);
// logDispatcher will serialize log, and set log size, and we will use the size after it
logManager.append(Collections.singletonList(e));
@@ -101,7 +100,6 @@ public class SynchronousSequencer implements LogSequencer {
return votingEntry;
}
-
public static class Factory implements LogSequencerFactory {
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index 359bb1393b..e27fdb86c4 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,7 +4,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
-import java.io.DataOutputStream;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.Peer;
@@ -12,13 +11,14 @@ import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.rpc.TSStatusCode;
-
import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
@@ -37,9 +37,7 @@ public class DirectorySnapshot extends Snapshot {
private TEndPoint source;
private String memberName;
- public DirectorySnapshot() {
-
- }
+ public DirectorySnapshot() {}
public DirectorySnapshot(File directory, List<Path> filePaths, List<Peer> peers) {
this.directory = directory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
index b5aa2ae455..7ade751b0b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/Snapshot.java
@@ -19,15 +19,15 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
/**
* As we can only hold a certain amount of logs in memory, when the logs' size exceed the memory
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 08793449fd..df31eeba40 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -61,8 +61,7 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
this.consensus = consensus;
}
- public void handleClientExit() {
- }
+ public void handleClientExit() {}
private RaftMember getMember(TConsensusGroupId groupId) throws TException {
RaftMember member = consensus.getMember(Factory.createFromTConsensusGroupId(groupId));
@@ -98,16 +97,19 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
ConfigChangeEntry lastConfigChangeEntry = findFirstConfigChangeEntry(request);
if (lastConfigChangeEntry != null) {
Peer thisPeer = new Peer(groupId, consensus.getThisNodeId(), consensus.getThisNode());
- consensus.createNewMemberIfAbsent(groupId, thisPeer, lastConfigChangeEntry.getOldPeers(),
+ consensus.createNewMemberIfAbsent(
+ groupId,
+ thisPeer,
+ lastConfigChangeEntry.getOldPeers(),
lastConfigChangeEntry.getNewPeers());
return consensus.getMember(groupId);
}
} catch (UnknownLogTypeException e) {
throw new TException(e.getMessage());
}
-
+ throw new NoMemberException("No such member of: " + tgroupId);
}
- throw new NoMemberException("No such member of: " + tgroupId);
+ return member;
}
@Override
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
index d9f137277f..745e1d5fef 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/LogUtils.java
@@ -17,17 +17,18 @@
* under the License.
*/
-
package org.apache.iotdb.consensus.natraft.utils;
-import java.nio.ByteBuffer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingEntry;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+
public class LogUtils {
private static final Logger logger = LoggerFactory.getLogger(LogUtils.class);
@@ -41,8 +42,8 @@ public class LogUtils {
return votingEntry;
}
- public static AppendEntryRequest buildAppendEntryRequest(Entry entry, boolean serializeNow,
- RaftMember member) {
+ public static AppendEntryRequest buildAppendEntryRequest(
+ Entry entry, boolean serializeNow, RaftMember member) {
AppendEntryRequest request = new AppendEntryRequest();
request.setTerm(member.getStatus().getTerm().get());
if (serializeNow) {
@@ -76,6 +77,4 @@ public class LogUtils {
}
return sendLogRequest;
}
-
-
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
index dc1ec45817..d149827c64 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/NodeUtils.java
@@ -1,11 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
package org.apache.iotdb.consensus.natraft.utils;
+import org.apache.iotdb.consensus.common.Peer;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.iotdb.consensus.common.Peer;
public class NodeUtils {
@@ -28,5 +48,4 @@ public class NodeUtils {
nodeUnion.addAll(newNodes);
return nodeUnion;
}
-
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
index ba01f7983f..7227b18940 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
@@ -24,9 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse.Builder;
import org.apache.iotdb.consensus.exception.ConsensusException;
-import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient.Factory;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.checkerframework.checker.units.qual.C;
public class StatusUtils {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
new file mode 100644
index 0000000000..ba3a85ac31
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Timer.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.natraft.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class Timer {
+
+ private static final Logger logger = LoggerFactory.getLogger(Timer.class);
+
+ public static final boolean ENABLE_INSTRUMENTING = true;
+
+ private static final String COORDINATOR = "Coordinator";
+ private static final String META_GROUP_MEMBER = "Meta group member";
+ private static final String DATA_GROUP_MEMBER = "Data group member";
+ private static final String RAFT_MEMBER_SENDER = " Raft member(sender)";
+ private static final String RAFT_MEMBER_RECEIVER = " Raft member(receiver)";
+ private static final String LOG_DISPATCHER = "Log dispatcher";
+
+ // convert nano to milli
+ private static final double TIME_SCALE = 1_000_000.0;
+
+ public enum Statistic {
+ // A dummy root for the convenience of prints
+ ROOT("ClassName", "BlockName", TIME_SCALE, true, null),
+ // coordinator
+ COORDINATOR_EXECUTE_NON_QUERY(COORDINATOR, "execute non query", TIME_SCALE, true, ROOT),
+
+ // meta group member
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY(
+ META_GROUP_MEMBER, "execute non query", TIME_SCALE, true, COORDINATOR_EXECUTE_NON_QUERY),
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP(
+ META_GROUP_MEMBER,
+ "execute in local group",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_REMOTE_GROUP(
+ META_GROUP_MEMBER,
+ "execute in remote group",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+ // data group member
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION(
+ DATA_GROUP_MEMBER,
+ "execute locally",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ DATA_GROUP_MEMBER_WAIT_LEADER(
+ DATA_GROUP_MEMBER,
+ "wait for leader",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ DATA_GROUP_MEMBER_FORWARD_PLAN(
+ DATA_GROUP_MEMBER,
+ "forward to leader",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ // raft member - sender
+ RAFT_SENDER_SEQUENCE_LOG(
+ RAFT_MEMBER_SENDER, "sequence log", TIME_SCALE, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY),
+ RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_APPEND_V2(
+ RAFT_MEMBER_SENDER,
+ "compete for log manager before append",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_OCCUPY_LOG_MANAGER_IN_APPEND(
+ RAFT_MEMBER_SENDER,
+ "occupy log manager in append",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_APPEND_LOG_V2(
+ RAFT_MEMBER_SENDER,
+ "locally append log",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_BUILD_LOG_REQUEST(
+ RAFT_MEMBER_SENDER,
+ "build SendLogRequest",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_BUILD_APPEND_REQUEST(
+ RAFT_MEMBER_SENDER,
+ "build AppendEntryRequest",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_BUILD_LOG_REQUEST),
+ RAFT_SENDER_OFFER_LOG(
+ RAFT_MEMBER_SENDER,
+ "offer log to dispatcher",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_COMMIT_LOG(
+ RAFT_MEMBER_SENDER,
+ "locally commit log",
+ TIME_SCALE,
+ true,
+ DATA_GROUP_MEMBER_LOCAL_EXECUTION),
+ RAFT_SENDER_COMPETE_LOG_MANAGER_BEFORE_COMMIT(
+ RAFT_MEMBER_SENDER,
+ "compete for log manager before commit",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_COMMIT_LOG),
+ RAFT_COMMIT_LOG_IN_MANAGER(
+ RAFT_MEMBER_SENDER, "commit log in log manager", TIME_SCALE, true, RAFT_SENDER_COMMIT_LOG),
+ RAFT_SENDER_EXIT_LOG_MANAGER(
+ RAFT_MEMBER_SENDER,
+ "exiting log manager synchronizer",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_COMMIT_LOG),
+ RAFT_SENDER_COMMIT_GET_LOGS(
+ RAFT_MEMBER_SENDER,
+ "get logs to be committed",
+ TIME_SCALE,
+ true,
+ RAFT_COMMIT_LOG_IN_MANAGER),
+ RAFT_SENDER_COMMIT_DELETE_EXCEEDING_LOGS(
+ RAFT_MEMBER_SENDER,
+ "delete logs exceeding capacity",
+ TIME_SCALE,
+ true,
+ RAFT_COMMIT_LOG_IN_MANAGER),
+ RAFT_SENDER_COMMIT_APPEND_AND_STABLE_LOGS(
+ RAFT_MEMBER_SENDER,
+ "append and stable committed logs",
+ TIME_SCALE,
+ true,
+ RAFT_COMMIT_LOG_IN_MANAGER),
+ RAFT_SENDER_COMMIT_APPLY_LOGS(
+ RAFT_MEMBER_SENDER,
+ "apply after committing logs",
+ TIME_SCALE,
+ true,
+ RAFT_COMMIT_LOG_IN_MANAGER),
+ RAFT_SENDER_COMMIT_TO_CONSUMER_LOGS(
+ RAFT_MEMBER_SENDER,
+ "provide log to consumer",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_COMMIT_APPLY_LOGS),
+ RAFT_SENDER_COMMIT_EXCLUSIVE_LOGS(
+ RAFT_MEMBER_SENDER,
+ "apply logs that cannot run in parallel",
+ TIME_SCALE,
+ true,
+ RAFT_SENDER_COMMIT_APPLY_LOGS),
+ RAFT_SENDER_COMMIT_WAIT_LOG_APPLY(
+ RAFT_MEMBER_SENDER, "wait until log is applied", TIME_SCALE, true, RAFT_SENDER_COMMIT_LOG),
+ RAFT_SENDER_IN_APPLY_QUEUE(
+ RAFT_MEMBER_SENDER, "in apply queue", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
+ RAFT_SENDER_DATA_LOG_APPLY(
+ RAFT_MEMBER_SENDER, "apply data log", TIME_SCALE, true, RAFT_SENDER_COMMIT_WAIT_LOG_APPLY),
+ // raft member - receiver
+ RAFT_RECEIVER_WAIT_FOR_PREV_LOG(
+ RAFT_MEMBER_RECEIVER,
+ "receiver wait for prev log",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ENTRY(
+ RAFT_MEMBER_RECEIVER,
+ "append entrys",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ACK(
+ RAFT_MEMBER_RECEIVER,
+ "ack append entrys",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_APPEND_ENTRY_FULL(
+ RAFT_MEMBER_RECEIVER,
+ "append entrys(full)",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_HANDLE_APPEND_ACK(
+ RAFT_MEMBER_SENDER,
+ "handle append entrys ack",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_RECEIVER_INDEX_DIFF(RAFT_MEMBER_RECEIVER, "index diff", 1.0, true, ROOT),
+ // log dispatcher
+ LOG_DISPATCHER_LOG_ENQUEUE(
+ LOG_DISPATCHER,
+ "enqueue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_LOG_ENQUEUE_SINGLE(
+ LOG_DISPATCHER,
+ "enqueue (single)",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_LOG_IN_QUEUE(
+ LOG_DISPATCHER,
+ "in queue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_LOG_BATCH_SIZE(
+ LOG_DISPATCHER, "batch size", 1, true, META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_RECEIVE_TO_CREATE(
+ LOG_DISPATCHER,
+ "from receive to create",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_ENQUEUE(
+ LOG_DISPATCHER,
+ "from create to queue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_DEQUEUE(
+ LOG_DISPATCHER,
+ "from create to dequeue",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_SENDING(
+ LOG_DISPATCHER,
+ "from create to sending",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_SENT(
+ LOG_DISPATCHER,
+ "from create to sent",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_ACCEPT(
+ LOG_DISPATCHER,
+ "from create to accept",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_COMMIT(
+ LOG_DISPATCHER,
+ "from create to commit",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_FROM_CREATE_TO_APPLIED(
+ LOG_DISPATCHER,
+ "from create to OK",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPEND_END(
+ LOG_DISPATCHER,
+ "from create to wait append end",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_SENDER_LOG_FROM_CREATE_TO_WAIT_APPLY_END(
+ LOG_DISPATCHER,
+ "from create to wait apply end",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ LOG_DISPATCHER_TOTAL(
+ LOG_DISPATCHER,
+ "total process time",
+ TIME_SCALE,
+ true,
+ META_GROUP_MEMBER_EXECUTE_NON_QUERY_IN_LOCAL_GROUP),
+ RAFT_WINDOW_LENGTH(RAFT_MEMBER_RECEIVER, "window length", 1, true, ROOT),
+ RAFT_RELAYED_ENTRY(RAFT_MEMBER_RECEIVER, "number of relayed entries", 1, true, ROOT),
+ RAFT_SEND_RELAY_ACK(RAFT_MEMBER_RECEIVER, "send relay ack", 1, true, ROOT),
+ RAFT_SENT_ENTRY_SIZE(RAFT_MEMBER_SENDER, "sent entry size", 1, true, ROOT),
+ DISPATCHER_QUEUE_LENGTH(RAFT_MEMBER_SENDER, "dispatcher queue length", 1, true, ROOT),
+ RAFT_RELAYED_LEVEL1_NUM(RAFT_MEMBER_SENDER, "level 1 relay node number", 1, true, ROOT),
+ RAFT_RECEIVE_RELAY_ACK(RAFT_MEMBER_SENDER, "receive relay ack", 1, true, ROOT),
+ RAFT_SENDER_OOW(RAFT_MEMBER_SENDER, "out of window", 1, true, ROOT),
+ RAFT_WEAK_ACCEPT(RAFT_MEMBER_SENDER, "weak accept", 1, true, ROOT),
+ RAFT_CONCURRENT_SENDER(RAFT_MEMBER_SENDER, "concurrent sender", 1, true, ROOT),
+ RAFT_INDEX_BLOCKER(RAFT_MEMBER_SENDER, "index blocker", 1, true, ROOT),
+ RAFT_APPEND_BLOCKER(RAFT_MEMBER_SENDER, "append blocker", 1, true, ROOT),
+ RAFT_APPLY_BLOCKER(RAFT_MEMBER_SENDER, "apply blocker", 1, true, ROOT);
+
+ String className;
+ String blockName;
+ AtomicLong sum = new AtomicLong(0);
+ AtomicLong counter = new AtomicLong(0);
+ AtomicLong intervalSum = new AtomicLong(0);
+ AtomicLong intervalCounter = new AtomicLong(0);
+ long max;
+ long intervalMax;
+ double scale;
+ boolean valid;
+ int level;
+ Statistic parent;
+ List<Statistic> children = new ArrayList<>();
+
+ Statistic(String className, String blockName, double scale, boolean valid, Statistic parent) {
+ this.className = className;
+ this.blockName = blockName;
+ this.scale = scale;
+ this.valid = valid;
+ this.parent = parent;
+ if (parent == null) {
+ level = -1;
+ } else {
+ level = parent.level + 1;
+ parent.children.add(this);
+ }
+ }
+
+ public void add(long val) {
+ if (ENABLE_INSTRUMENTING) {
+ sum.addAndGet(val);
+ counter.incrementAndGet();
+ intervalSum.addAndGet(val);
+ intervalCounter.incrementAndGet();
+ max = Math.max(max, val);
+ intervalMax = Math.max(intervalMax, val);
+ }
+ }
+
+ /** @return System.nanoTime() if the ENABLE_INSTRUMENTING is true, else zero */
+ public long getOperationStartTime() {
+ if (ENABLE_INSTRUMENTING) {
+ return System.nanoTime();
+ }
+ return Long.MIN_VALUE;
+ }
+
+ /**
+ * This method equals `add(System.nanoTime() - start)`. We wrap `System.nanoTime()` in this
+ * method to avoid unnecessary calls when instrumenting is disabled.
+ */
+ public long calOperationCostTimeFromStart(long startTime) {
+ if (ENABLE_INSTRUMENTING && startTime != Long.MIN_VALUE && startTime != 0) {
+ long consumed = System.nanoTime() - startTime;
+ add(consumed);
+ return consumed;
+ }
+ return 0;
+ }
+
+ /** WARN: no current safety guarantee. */
+ public void reset() {
+ sum.set(0);
+ counter.set(0);
+ max = 0;
+ intervalCounter.set(0);
+ intervalSum.set(0);
+ intervalMax = 0;
+ }
+
+ /** WARN: no current safety guarantee. */
+ public static void resetAll() {
+ for (Statistic value : values()) {
+ value.reset();
+ }
+ }
+
+ @Override
+ public String toString() {
+ double s = sum.get() / scale;
+ long cnt = counter.get();
+ double intervalS = intervalSum.get() / scale;
+ long intervalCnt = intervalCounter.get();
+ double avg = s / cnt;
+ double intervalAvg = intervalS / intervalCnt;
+ intervalSum.set(0);
+ intervalCounter.set(0);
+ intervalMax = 0;
+ return String.format(
+ "%s - %s: %.4f(%.4f), %d(%d), %.4f(%.4f), %d(%d)",
+ className, blockName, s, intervalS, cnt, intervalCnt, avg, intervalAvg, max, intervalMax);
+ }
+
+ public long getCnt() {
+ return counter.get();
+ }
+
+ public long getSum() {
+ return sum.get();
+ }
+
+ public static String getReport() {
+ if (!ENABLE_INSTRUMENTING) {
+ return "";
+ }
+ StringBuilder result = new StringBuilder();
+ printTo(Statistic.ROOT, result);
+ return result.toString();
+ }
+
+ private static void printTo(Statistic currNode, StringBuilder out) {
+ if (currNode != Statistic.ROOT && currNode.valid) {
+ if (currNode.counter.get() != 0) {
+ indent(out, currNode.level);
+ out.append(currNode).append("\n");
+ }
+ }
+ for (Statistic child : currNode.children) {
+ printTo(child, out);
+ }
+ }
+
+ private static void indent(StringBuilder out, int indents) {
+ for (int i = 0; i < indents; i++) {
+ out.append(" ");
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index 0994935c95..ae890e6a32 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -394,4 +394,9 @@ public abstract class InsertNode extends WritePlanNode {
result = 31 * result + Arrays.hashCode(dataTypes);
return result;
}
+
+ @Override
+ public PartialPath conflictKey() {
+ return devicePath;
+ }
}