You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2023/02/20 01:20:47 UTC
[iotdb] branch native_raft updated: optimize package structure
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch native_raft
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/native_raft by this push:
new a02d65c768 optimize package structure
a02d65c768 is described below
commit a02d65c768297dc5d285ef7429a4b9d54fe88a9d
Author: Tian Jiang <jt...@163.com>
AuthorDate: Mon Feb 20 09:22:05 2023 +0800
optimize package structure
---
.../iotdb/consensus/natraft/RaftConsensus.java | 3 +-
.../iotdb/consensus/natraft/Utils/IOUtils.java | 98 ------------------
.../natraft/client/AsyncRaftServiceClient.java | 6 +-
.../natraft/client/ForwardRequestHandler.java | 2 +-
.../natraft/client/RaftConsensusClientPool.java | 3 +-
.../consensus/natraft/protocol/RaftMember.java | 114 +++++++++------------
.../protocol/heartbeat/ElectionReqHandler.java | 2 +-
.../protocol/heartbeat/ElectionRespHandler.java | 10 +-
.../protocol/heartbeat/HeartbeatReqHandler.java | 4 +-
.../protocol/heartbeat/HeartbeatRespHandler.java | 14 +--
.../protocol/log/appender/BlockingLogAppender.java | 2 +-
.../log/appender/SlidingWindowLogAppender.java | 24 ++---
.../protocol/log/catchup/CatchUpManager.java | 1 -
.../natraft/protocol/log/catchup/CatchUpTask.java | 17 ++-
.../log/catchup/LogCatchUpInBatchHandler.java | 9 +-
.../protocol/log/catchup/LogCatchUpTask.java | 18 ++--
.../log/catchup/SnapshotCatchUpHandler.java | 1 -
.../protocol/log/catchup/SnapshotCatchUpTask.java | 1 -
.../log/dispatch/AppendNodeEntryHandler.java | 10 +-
.../protocol/log/dispatch/LogDispatcher.java | 13 ++-
.../{ => dispatch}/flowcontrol/FlowBalancer.java | 2 +-
.../{ => dispatch}/flowcontrol/FlowMonitor.java | 2 +-
.../flowcontrol/FlowMonitorManager.java | 2 +-
.../natraft/protocol/log/logtype/RequestEntry.java | 3 +-
.../log/{ => manager}/CommitLogCallback.java | 2 +-
.../protocol/log/{ => manager}/CommitLogTask.java | 4 +-
.../manager/DirectorySnapshotRaftLogManager.java | 13 ++-
.../protocol/log/manager/RaftLogManager.java | 4 +-
.../serialization/LogManagerMeta.java | 2 +-
.../serialization/StableEntryManager.java | 2 +-
.../serialization/SyncLogDequeSerializer.java | 4 +-
.../protocol/log/snapshot/DirectorySnapshot.java | 3 +-
.../natraft/service/RaftRPCServiceProcessor.java | 6 +-
.../CommitLogCallback.java => utils/IOUtils.java} | 29 ++----
.../natraft/{Utils => utils}/Response.java | 19 +++-
.../natraft/{Utils => utils}/StatusUtils.java | 2 +-
.../apache/iotdb/consensus/raft/ReplicateTest.java | 4 +-
.../iotdb/consensus/raft/util/FakeDataSet.java | 22 +++-
.../iotdb/consensus/raft/util/TestEntry.java | 7 +-
.../consensus/raft/util/TestStateMachine.java | 16 +--
40 files changed, 206 insertions(+), 294 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
index 4e4674152e..81fe9ebeb0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/RaftConsensus.java
@@ -45,8 +45,7 @@ import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
import org.apache.iotdb.consensus.natraft.exception.CheckConsistencyException;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol.FlowMonitor;
-import org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.natraft.service.RaftRPCService;
import org.apache.iotdb.consensus.natraft.service.RaftRPCServiceProcessor;
import org.apache.iotdb.rpc.TSStatusCode;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/IOUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/IOUtils.java
deleted file mode 100644
index cc57b4a0e2..0000000000
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/IOUtils.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.consensus.natraft.Utils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-
-@SuppressWarnings("java:S1135")
-public class IOUtils {
-
- private static final Logger logger = LoggerFactory.getLogger(IOUtils.class);
-
- private IOUtils() {
- // util class
- }
-
- /**
- * An interface that is used for a node to pull chunks of files like TsFiles. The file should be a
- * temporary hard link, and once the file is totally read, it will be removed.
- */
- public static ByteBuffer readFile(String filePath, long offset, int length) throws IOException {
- // TODO-Cluster: hold if the file is an unclosed TsFile
- File file = new File(filePath);
- if (!file.exists()) {
- logger.warn("Reading a non-existing snapshot file {}", filePath);
- return ByteBuffer.allocate(0);
- }
-
- ByteBuffer result;
- boolean fileExhausted;
- int len;
- try (BufferedInputStream bufferedInputStream =
- new BufferedInputStream(new FileInputStream(file))) {
- skipExactly(bufferedInputStream, offset);
- byte[] bytes = new byte[length];
- result = ByteBuffer.wrap(bytes);
- len = bufferedInputStream.read(bytes);
- result.limit(Math.max(len, 0));
- fileExhausted = bufferedInputStream.available() <= 0;
- }
-
- if (fileExhausted) {
- try {
- Files.delete(file.toPath());
- if (logger.isInfoEnabled()) {
- logger.info(
- "Snapshot file {} is exhausted, offset: {}, read length: {}, file length: {}",
- filePath,
- offset,
- len,
- file.length());
- }
- } catch (IOException e) {
- logger.warn("Cannot delete an exhausted file {}", filePath, e);
- }
- }
- return result;
- }
-
- private static void skipExactly(InputStream stream, long byteToSkip) throws IOException {
- while (byteToSkip > 0) {
- byteToSkip -= stream.skip(byteToSkip);
- }
- }
-
- public static Throwable getRootCause(Throwable e) {
- Throwable curr = e;
- while (curr.getCause() != null) {
- curr = curr.getCause();
- }
- return curr;
- }
-}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
index d45ad9dacb..15989f13dc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/AsyncRaftServiceClient.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.client;
-import java.net.ConnectException;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.ClientManager;
import org.apache.iotdb.commons.client.factory.AsyncThriftClientFactory;
@@ -37,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.ConnectException;
public class AsyncRaftServiceClient extends RaftService.AsyncClient {
@@ -91,8 +91,8 @@ public class AsyncRaftServiceClient extends RaftService.AsyncClient {
*/
@Override
public void onError(Exception e) {
- if (e.getCause() instanceof NoMemberException ||
- e instanceof TApplicationException && e.getMessage().contains("No such member")) {
+ if (e.getCause() instanceof NoMemberException
+ || e instanceof TApplicationException && e.getMessage().contains("No such member")) {
logger.debug(e.getMessage());
___currentMethod = null;
returnSelf();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/ForwardRequestHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/ForwardRequestHandler.java
index cab6e2e97c..53720cf47d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/ForwardRequestHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/ForwardRequestHandler.java
@@ -7,7 +7,7 @@ package org.apache.iotdb.consensus.natraft.client;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.natraft.Utils.StatusUtils;
+import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
index 074ed27765..a48e040780 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/client/RaftConsensusClientPool.java
@@ -37,8 +37,7 @@ public class RaftConsensusClientPool {
implements IClientPoolFactory<TEndPoint, AsyncRaftServiceClient> {
private final RaftConfig config;
- private static final String RAFT_CONSENSUS_CLIENT_POOL_THREAD_NAME =
- "RaftConsensusClientPool";
+ private static final String RAFT_CONSENSUS_CLIENT_POOL_THREAD_NAME = "RaftConsensusClientPool";
public AsyncRaftServiceClientPoolFactory(RaftConfig config) {
this.config = config;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
index 8ace88143f..c2da03aa23 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/RaftMember.java
@@ -1,24 +1,24 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
-
-
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+
+
+*/
package org.apache.iotdb.consensus.natraft.protocol;
@@ -33,9 +33,6 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
-import org.apache.iotdb.consensus.natraft.Utils.IOUtils;
-import org.apache.iotdb.consensus.natraft.Utils.Response;
-import org.apache.iotdb.consensus.natraft.Utils.StatusUtils;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.client.GenericHandler;
import org.apache.iotdb.consensus.natraft.client.SyncClientAdaptor;
@@ -48,8 +45,6 @@ import org.apache.iotdb.consensus.natraft.protocol.consistency.StrongCheckConsis
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.ElectionReqHandler;
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatReqHandler;
import org.apache.iotdb.consensus.natraft.protocol.heartbeat.HeartbeatThread;
-import org.apache.iotdb.consensus.natraft.protocol.log.CommitLogCallback;
-import org.apache.iotdb.consensus.natraft.protocol.log.CommitLogTask;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
@@ -61,16 +56,21 @@ import org.apache.iotdb.consensus.natraft.protocol.log.applier.BaseApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.catchup.CatchUpManager;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.LogDispatcher;
import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.VotingLogList;
-import org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol.FlowBalancer;
-import org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowBalancer;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.RequestEntry;
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogCallback;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.CommitLogTask;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.DirectorySnapshotRaftLogManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencer;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.LogSequencerFactory;
import org.apache.iotdb.consensus.natraft.protocol.log.sequencing.SynchronousSequencer;
-import org.apache.iotdb.consensus.natraft.protocol.log.serialization.SyncLogDequeSerializer;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
+import org.apache.iotdb.consensus.natraft.utils.IOUtils;
+import org.apache.iotdb.consensus.natraft.utils.Response;
+import org.apache.iotdb.consensus.natraft.utils.StatusUtils;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
@@ -127,9 +127,7 @@ public class RaftMember {
*/
private final Object waitLeaderCondition = new Object();
- /**
- * the lock is to make sure that only one thread can apply snapshot at the same time
- */
+ /** the lock is to make sure that only one thread can apply snapshot at the same time */
private final Lock snapshotApplyLock = new ReentrantLock();
/**
* when the commit progress is updated by a heartbeat, this object is notified so that we may know
@@ -138,9 +136,7 @@ public class RaftMember {
private Object syncLock = new Object();
protected Peer thisNode;
- /**
- * the nodes that belong to the same raft group as thisNode.
- */
+ /** the nodes that belong to the same raft group as thisNode. */
protected List<Peer> allNodes;
protected ConsensusGroupId groupId;
@@ -149,9 +145,7 @@ public class RaftMember {
protected RaftStatus status = new RaftStatus();
- /**
- * the raft logs are all stored and maintained in the log manager
- */
+ /** the raft logs are all stored and maintained in the log manager */
protected RaftLogManager logManager;
protected HeartbeatThread heartbeatThread;
@@ -176,15 +170,12 @@ public class RaftMember {
protected IClientManager<TEndPoint, AsyncRaftServiceClient> clientManager;
protected CatchUpManager catchUpManager;
- /**
- * a thread pool that is used to do commit log tasks asynchronous in heartbeat thread
- */
+ /** a thread pool that is used to do commit log tasks asynchronous in heartbeat thread */
private ExecutorService commitLogPool;
/**
* logDispatcher buff the logs orderly according to their log indexes and send them sequentially,
- * which avoids the followers receiving out-of-order logs, forcing them to wait for previous
- * logs.
+ * which avoids the followers receiving out-of-order logs, forcing them to wait for previous logs.
*/
private volatile LogDispatcher logDispatcher;
@@ -215,8 +206,11 @@ public class RaftMember {
this.groupId = groupId;
this.name =
- groupId.toString() + "-" + thisNode.getEndpoint().getIp() + "-" + thisNode.getEndpoint()
- .getPort();
+ groupId.toString()
+ + "-"
+ + thisNode.getEndpoint().getIp()
+ + "-"
+ + thisNode.getEndpoint().getPort();
this.clientManager = clientManager;
this.stateMachine = stateMachine;
@@ -457,9 +451,7 @@ public class RaftMember {
return appendEntriesInternal(request);
}
- /**
- * Similar to appendEntry, while the incoming load is batch of logs instead of a single log.
- */
+ /** Similar to appendEntry, while the incoming load is batch of logs instead of a single log. */
private AppendEntryResult appendEntriesInternal(AppendEntriesRequest request)
throws UnknownLogTypeException {
logger.debug("{} received an AppendEntriesRequest", name);
@@ -664,7 +656,7 @@ public class RaftMember {
if (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
|| ((stronglyAccepted < quorumSize
- || (!enableWeakAcceptance || (totalAccepted < quorumSize)) && !log.isHasFailed()))) {
+ || (!enableWeakAcceptance || (totalAccepted < quorumSize)) && !log.isHasFailed()))) {
waitAppendResultLoop(log, quorumSize);
}
totalAccepted = votingLogList.totalAcceptedNodeNum(log);
@@ -724,9 +716,9 @@ public class RaftMember {
synchronized (log.getEntry()) {
while (log.getEntry().getCurrLogIndex() == Long.MIN_VALUE
|| (stronglyAccepted < quorumSize
- && (!(enableWeakAcceptance || (totalAccepted < quorumSize))
- && alreadyWait < config.getWriteOperationTimeoutMS()
- && !log.isHasFailed()))) {
+ && (!(enableWeakAcceptance || (totalAccepted < quorumSize))
+ && alreadyWait < config.getWriteOperationTimeoutMS()
+ && !log.isHasFailed()))) {
try {
log.getEntry().wait(waitTime);
} catch (InterruptedException e) {
@@ -818,7 +810,7 @@ public class RaftMember {
*
* @return true if this node has caught up before timeout, false otherwise
* @throws CheckConsistencyException if leaderCommitId bigger than localAppliedId a threshold
- * value after timeout
+ * value after timeout
*/
protected boolean waitUntilCatchUp(CheckConsistency checkConsistency)
throws CheckConsistencyException {
@@ -850,7 +842,7 @@ public class RaftMember {
* sync local applyId to leader commitId
*
* @param leaderCommitId leader commit id
- * @param fastFail if enabled, when log differ too much, return false directly.
+ * @param fastFail if enabled, when log differ too much, return false directly.
* @return true if leaderCommitId <= localAppliedId
*/
public boolean syncLocalApply(long leaderCommitId, boolean fastFail) {
@@ -897,9 +889,7 @@ public class RaftMember {
return false;
}
- /**
- * Wait until the leader of this node becomes known or time out.
- */
+ /** Wait until the leader of this node becomes known or time out. */
public void waitLeader() {
long startTime = System.currentTimeMillis();
while (status.leader.get() == null || status.leader.get() == null) {
@@ -936,9 +926,7 @@ public class RaftMember {
return handler.getResult(config.getConnectionTimeoutInMS());
}
- /**
- * @return true if there is a log whose index is "index" and term is "term", false otherwise
- */
+ /** @return true if there is a log whose index is "index" and term is "term", false otherwise */
public boolean matchLog(long index, long term) {
boolean matched = logManager.matchTerm(term, index);
logger.debug("Log {}-{} matched: {}", index, term, matched);
@@ -948,10 +936,10 @@ public class RaftMember {
/**
* Forward a non-query plan to a node using the default client.
*
- * @param plan a non-query plan
- * @param node cannot be the local node
+ * @param plan a non-query plan
+ * @param node cannot be the local node
* @param groupId must be set for data group communication, set to null for meta group
- * communication
+ * communication
* @return a TSStatus indicating if the forwarding is successful.
*/
public TSStatus forwardRequest(IConsensusRequest plan, TEndPoint node, ConsensusGroupId groupId) {
@@ -978,7 +966,7 @@ public class RaftMember {
/**
* Forward a non-query plan to "receiver" using "client".
*
- * @param plan a non-query plan
+ * @param plan a non-query plan
* @param groupId to determine which DataGroupMember of "receiver" will process the request.
* @return a TSStatus indicating if the forwarding is successful.
*/
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
index a35b650c49..a39a9524b3 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionReqHandler.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.Utils.Response;
+import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.ElectionRequest;
import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
index b260813fa8..52192083ef 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/ElectionRespHandler.java
@@ -31,9 +31,9 @@ import java.net.ConnectException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_AGREE;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_LEADER_STILL_ONLINE;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_LEADER_STILL_ONLINE;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_NODE_IS_NOT_IN_GROUP;
/**
* ElectionHandler checks the result from a voter and decides whether the election goes on, succeeds
@@ -131,8 +131,8 @@ public class ElectionRespHandler implements AsyncMethodCallback<Long> {
public void onError(Exception exception) {
if (exception instanceof ConnectException) {
logger.debug("{}: Cannot connect to {}: {}", memberName, voter, exception.getMessage());
- } else if (exception instanceof TApplicationException && exception.getMessage()
- .contains("No such member")) {
+ } else if (exception instanceof TApplicationException
+ && exception.getMessage().contains("No such member")) {
logger.debug("{}: voter {} not ready: {}", memberName, voter, exception.getMessage());
} else {
logger.warn("{}: A voter {} encountered an error:", memberName, voter, exception);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
index 9fdda2ff73..55d63af7f2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatReqHandler.java
@@ -4,7 +4,7 @@ import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.RaftRole;
-import org.apache.iotdb.consensus.natraft.Utils.Response;
+import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.HeartBeatRequest;
import org.apache.iotdb.consensus.raft.thrift.HeartBeatResponse;
@@ -46,7 +46,7 @@ public class HeartbeatReqHandler {
member.getName(),
request.getLeader());
}
- } else {
+ } else {
if (leaderTerm > thisTerm || member.getStatus().getLeader().get() == null) {
// try updating local term or leader
try {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
index 6d236381ec..326c5367f1 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/heartbeat/HeartbeatRespHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.heartbeat;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.PeerInfo;
@@ -33,7 +32,7 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
/**
* HeartbeatHandler checks the response of a heartbeat and decides whether to start a catch-up or
@@ -82,8 +81,11 @@ public class HeartbeatRespHandler implements AsyncMethodCallback<HeartBeatRespon
private void handleNormalHeartbeatResponse(HeartBeatResponse resp) {
// check the necessity of performing a catch-up
- Peer peer = new Peer(ConsensusGroupId.Factory.createFromTConsensusGroupId(resp.groupId),
- resp.followerId, resp.follower);
+ Peer peer =
+ new Peer(
+ ConsensusGroupId.Factory.createFromTConsensusGroupId(resp.groupId),
+ resp.followerId,
+ resp.follower);
long lastLogIdx = resp.getLastLogIndex();
long lastLogTerm = resp.getLastLogTerm();
long localLastLogIdx = localMember.getLogManager().getLastLogIndex();
@@ -143,8 +145,8 @@ public class HeartbeatRespHandler implements AsyncMethodCallback<HeartBeatRespon
public void onError(Exception exception) {
if (exception instanceof ConnectException) {
logger.debug("{}: Cannot connect to {}: {}", memberName, receiver, exception.getMessage());
- } else if (exception instanceof TApplicationException && exception.getMessage()
- .contains("No such member")) {
+ } else if (exception instanceof TApplicationException
+ && exception.getMessage().contains("No such member")) {
logger.debug("{}: node {} not ready: {}", memberName, receiver, exception.getMessage());
} else {
logger.error(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
index 75bba3e290..6c5e54d1d6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/BlockingLogAppender.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.consensus.natraft.protocol.log.appender;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-import org.apache.iotdb.consensus.natraft.Utils.Response;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
index da366f0330..4572895c49 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/appender/SlidingWindowLogAppender.java
@@ -19,20 +19,22 @@
package org.apache.iotdb.consensus.natraft.protocol.log.appender;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import org.apache.iotdb.consensus.natraft.Utils.Response;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+import org.apache.iotdb.consensus.natraft.utils.Response;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
public class SlidingWindowLogAppender implements LogAppender {
private static final Logger logger = LoggerFactory.getLogger(SlidingWindowLogAppender.class);
@@ -135,8 +137,7 @@ public class SlidingWindowLogAppender implements LogAppender {
while (true) {
// TODO: Consider memory footprint to execute a precise rejection
if ((logManager.getCommitLogIndex() - logManager.getAppliedIndex())
- <= config
- .getUnAppliedRaftLogNumForRejectThreshold()) {
+ <= config.getUnAppliedRaftLogNumForRejectThreshold()) {
synchronized (logManager) {
success =
logManager.maybeAppend(windowPrevLogIndex, windowPrevLogTerm, leaderCommit, logs);
@@ -144,8 +145,7 @@ public class SlidingWindowLogAppender implements LogAppender {
}
}
try {
- TimeUnit.MILLISECONDS.sleep(
- config.getCheckPeriodWhenInsertBlocked());
+ TimeUnit.MILLISECONDS.sleep(config.getCheckPeriodWhenInsertBlocked());
if (System.currentTimeMillis() - startWaitingTime
> config.getMaxWaitingTimeWhenInsertBlocked()) {
result.status = Response.RESPONSE_TOO_BUSY;
@@ -206,7 +206,6 @@ public class SlidingWindowLogAppender implements LogAppender {
return result;
}
-
private AppendEntryResult appendEntry(
long prevLogIndex, long prevLogTerm, long leaderCommit, Entry entry) {
long appendedPos = 0;
@@ -216,8 +215,9 @@ public class SlidingWindowLogAppender implements LogAppender {
int windowPos = (int) (entry.getCurrLogIndex() - logManager.getLastLogIndex() - 1);
if (windowPos < 0) {
// the new entry may replace an appended entry
- appendedPos = logManager.maybeAppend(prevLogIndex, prevLogTerm, leaderCommit,
- Collections.singletonList(entry));
+ appendedPos =
+ logManager.maybeAppend(
+ prevLogIndex, prevLogTerm, leaderCommit, Collections.singletonList(entry));
result.status = Response.RESPONSE_STRONG_ACCEPT;
result.setLastLogIndex(logManager.getLastLogIndex());
result.setLastLogTerm(logManager.getLastLogTerm());
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpManager.java
index a6747f9e60..4562f4b583 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
index e75f2cceee..11292d0b26 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/CatchUpTask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -78,7 +77,7 @@ public class CatchUpTask implements Runnable {
/**
* @return true if a matched index is found so that we can use logs only to catch up, or false if
- * the catch up must be done with a snapshot.
+ * the catch up must be done with a snapshot.
* @throws TException
* @throws InterruptedException
*/
@@ -237,7 +236,7 @@ public class CatchUpTask implements Runnable {
/**
* @param index the index of a log in logs
* @return true if the previous log at logs[index] matches a log in the remote node, false if the
- * corresponding log cannot be found
+ * corresponding log cannot be found
* @throws LeaderUnknownException
* @throws TException
* @throws InterruptedException
@@ -270,9 +269,9 @@ public class CatchUpTask implements Runnable {
/**
* @param logIndex the log index needs to check
- * @param logTerm the log term need to check
+ * @param logTerm the log term need to check
* @return true if the log's index and term matches a log in the remote node, false if the
- * corresponding log cannot be found
+ * corresponding log cannot be found
* @throws TException
* @throws InterruptedException
*/
@@ -284,8 +283,8 @@ public class CatchUpTask implements Runnable {
return false;
}
matched =
- SyncClientAdaptor.matchTerm(client, node.getEndpoint(), logIndex, logTerm,
- raftMember.getRaftGroupId());
+ SyncClientAdaptor.matchTerm(
+ client, node.getEndpoint(), logIndex, logTerm, raftMember.getRaftGroupId());
return matched;
}
@@ -307,9 +306,7 @@ public class CatchUpTask implements Runnable {
}
}
- /**
- * Remove logs that are contained in the snapshot.
- */
+ /** Remove logs that are contained in the snapshot. */
private void removeSnapshotLogs() {
Entry logToSearch = new EmptyEntry(snapshot.getLastLogIndex(), snapshot.getLastLogTerm());
int pos =
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
index 8fb954e691..e6892886c8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpInBatchHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -32,10 +31,10 @@ import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_AGREE;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_LOG_MISMATCH;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_STRONG_ACCEPT;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_WEAK_ACCEPT;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_WEAK_ACCEPT;
public class LogCatchUpInBatchHandler implements AsyncMethodCallback<AppendEntryResult> {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
index 15f19f455b..13e770f9e7 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/LogCatchUpTask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
@@ -54,8 +53,7 @@ public class LogCatchUpTask implements Callable<Boolean> {
boolean abort = false;
protected RaftConfig config;
- LogCatchUpTask(
- List<Entry> logs, Peer node, CatchUpManager catchUpManager, RaftConfig config) {
+ LogCatchUpTask(List<Entry> logs, Peer node, CatchUpManager catchUpManager, RaftConfig config) {
this.logs = logs;
this.node = node;
this.raftMember = catchUpManager.getMember();
@@ -72,15 +70,13 @@ public class LogCatchUpTask implements Callable<Boolean> {
request.setLeaderId(raftMember.getThisNode().getNodeId());
request.setLeaderCommit(raftMember.getLogManager().getCommitLogIndex());
- synchronized (raftMember.getStatus().getTerm()) {
- // make sure this node is still a leader
- if (raftMember.getRole() != RaftRole.LEADER) {
- logger.debug("Leadership is lost when doing a catch-up to {}, aborting", node);
- abort = true;
- return null;
- }
- request.setTerm(raftMember.getStatus().getTerm().get());
+ // make sure this node is still a leader
+ if (raftMember.getRole() != RaftRole.LEADER) {
+ logger.debug("Leadership is lost when doing a catch-up to {}, aborting", node);
+ abort = true;
+ return null;
}
+ request.setTerm(raftMember.getStatus().getTerm().get());
request.setEntries(logList);
// set index for raft
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
index 205cb4fa38..56af324502 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpHandler.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
index 08cdbd198c..437c25d645 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/catchup/SnapshotCatchUpTask.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.consensus.natraft.protocol.log.catchup;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.exception.LeaderUnknownException;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
index 34cc70f3bf..9a838f26de 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/AppendNodeEntryHandler.java
@@ -31,11 +31,11 @@ import org.slf4j.LoggerFactory;
import java.net.ConnectException;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_AGREE;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_LOG_MISMATCH;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_OUT_OF_WINDOW;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_STRONG_ACCEPT;
-import static org.apache.iotdb.consensus.natraft.Utils.Response.RESPONSE_WEAK_ACCEPT;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_AGREE;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_LOG_MISMATCH;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_OUT_OF_WINDOW;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_STRONG_ACCEPT;
+import static org.apache.iotdb.consensus.natraft.utils.Response.RESPONSE_WEAK_ACCEPT;
/**
* AppendNodeEntryHandler checks if the log is successfully appended by the quorum or some node has
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
index 92ca86cfa9..401b985fe0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/LogDispatcher.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.consensus.natraft.client.AsyncRaftServiceClient;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
import org.apache.iotdb.consensus.natraft.protocol.log.VotingLog;
-import org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol.FlowMonitorManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol.FlowMonitorManager;
import org.apache.iotdb.consensus.raft.thrift.AppendEntriesRequest;
import org.apache.iotdb.consensus.raft.thrift.AppendEntryResult;
@@ -76,8 +76,7 @@ public class LogDispatcher {
public LogDispatcher(RaftMember member, RaftConfig config) {
this.member = member;
this.config = config;
- this.queueOrdered =
- !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
+ this.queueOrdered = !(config.isUseFollowerSlidingWindow() && config.isEnableWeakAcceptance());
this.bindingThreadNum = config.getDispatcherBindingThreadNum();
createQueueAndBindingThreads();
}
@@ -108,8 +107,12 @@ public class LogDispatcher {
pair.getKey(),
n ->
IoTDBThreadPoolFactory.newCachedThreadPool(
- "LogDispatcher-" + member.getName() + "-" + pair.getKey().getEndpoint()
- .getIp() + "-" + pair.getKey().getEndpoint().getPort()))
+ "LogDispatcher-"
+ + member.getName()
+ + "-"
+ + pair.getKey().getEndpoint().getIp()
+ + "-"
+ + pair.getKey().getEndpoint().getPort()))
.submit(newDispatcherThread(pair.getKey(), pair.getValue()));
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowBalancer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowBalancer.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
index 3f2c88bf34..14a1b20eb8 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowBalancer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowBalancer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol;
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.consensus.common.Peer;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitor.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
index 2f36323367..feba556131 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitor.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol;
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitorManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
similarity index 96%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitorManager.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
index 0d5f9c2719..5668e183d2 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/flowcontrol/FlowMonitorManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/dispatch/flowcontrol/FlowMonitorManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.flowcontrol;
+package org.apache.iotdb.consensus.natraft.protocol.log.dispatch.flowcontrol;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
index 0e7440d344..a81bb5924b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/logtype/RequestEntry.java
@@ -58,8 +58,7 @@ public class RequestEntry extends Entry {
ByteBuffer byteBuffer = request.serializeToByteBuffer();
byteBuffer.rewind();
dataOutputStream.writeInt(byteBuffer.remaining());
- dataOutputStream.write(
- byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.remaining());
+ dataOutputStream.write(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.remaining());
} catch (IOException e) {
// unreachable
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogCallback.java
similarity index 95%
copy from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogCallback.java
index 78ed30db8b..9d7c38c157 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogCallback.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log;
+package org.apache.iotdb.consensus.natraft.protocol.log.manager;
import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogTask.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogTask.java
similarity index 93%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogTask.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogTask.java
index 3b4d33d44d..005ddfc58a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogTask.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/CommitLogTask.java
@@ -17,9 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log;
-
-import org.apache.iotdb.consensus.natraft.protocol.log.manager.RaftLogManager;
+package org.apache.iotdb.consensus.natraft.protocol.log.manager;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
index 68dc00c52f..c353d1d8cc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/DirectorySnapshotRaftLogManager.java
@@ -19,14 +19,15 @@
package org.apache.iotdb.consensus.natraft.protocol.log.manager;
-import java.io.File;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
-import org.apache.iotdb.consensus.natraft.protocol.log.serialization.StableEntryManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.DirectorySnapshot;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
+import java.io.File;
+
public class DirectorySnapshotRaftLogManager extends RaftLogManager {
private File latestSnapshotDir;
@@ -47,7 +48,13 @@ public class DirectorySnapshotRaftLogManager extends RaftLogManager {
@Override
public void takeSnapshot() {
- latestSnapshotDir = new File(config.getStorageDir() + File.separator + getName() + "-snapshot-" + System.currentTimeMillis());
+ latestSnapshotDir =
+ new File(
+ config.getStorageDir()
+ + File.separator
+ + getName()
+ + "-snapshot-"
+ + System.currentTimeMillis());
stateMachine.takeSnapshot(latestSnapshotDir);
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
index b7c9edbc47..4bf91e629e 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/RaftLogManager.java
@@ -28,8 +28,8 @@ import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.applier.LogApplier;
import org.apache.iotdb.consensus.natraft.protocol.log.logtype.EmptyEntry;
-import org.apache.iotdb.consensus.natraft.protocol.log.serialization.LogManagerMeta;
-import org.apache.iotdb.consensus.natraft.protocol.log.serialization.StableEntryManager;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.LogManagerMeta;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.StableEntryManager;
import org.apache.iotdb.consensus.natraft.protocol.log.snapshot.Snapshot;
import org.slf4j.Logger;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/LogManagerMeta.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/LogManagerMeta.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
index 58edd803fb..d342bd1a54 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/LogManagerMeta.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/LogManagerMeta.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.serialization;
+package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/StableEntryManager.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
similarity index 95%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/StableEntryManager.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
index 37ed1eb050..130ea8866f 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/StableEntryManager.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/StableEntryManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.serialization;
+package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
import org.apache.iotdb.consensus.natraft.protocol.HardState;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/SyncLogDequeSerializer.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
similarity index 99%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/SyncLogDequeSerializer.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
index 2649064818..c8dc1d2933 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/serialization/SyncLogDequeSerializer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/manager/serialization/SyncLogDequeSerializer.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log.serialization;
+package org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -27,7 +27,7 @@ import org.apache.iotdb.consensus.natraft.protocol.HardState;
import org.apache.iotdb.consensus.natraft.protocol.RaftConfig;
import org.apache.iotdb.consensus.natraft.protocol.log.Entry;
import org.apache.iotdb.consensus.natraft.protocol.log.LogParser;
-import org.apache.iotdb.consensus.natraft.protocol.log.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
+import org.apache.iotdb.consensus.natraft.protocol.log.manager.serialization.SyncLogDequeSerializer.VersionController.SimpleFileVersionController;
import org.apache.iotdb.tsfile.utils.BytesUtils;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
index b6e9df30fb..86c96aab0a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/snapshot/DirectorySnapshot.java
@@ -4,9 +4,10 @@
package org.apache.iotdb.consensus.natraft.protocol.log.snapshot;
+import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
+
import java.io.File;
import java.nio.ByteBuffer;
-import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
public class DirectorySnapshot extends Snapshot {
private File directory;
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
index 0ce6d02ffa..41267531e0 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/service/RaftRPCServiceProcessor.java
@@ -99,10 +99,8 @@ public class RaftRPCServiceProcessor implements RaftService.AsyncIface {
@Override
public void matchTerm(
- long index,
- long term,
- TConsensusGroupId groupId,
- AsyncMethodCallback<Boolean> resultHandler) throws TException {
+ long index, long term, TConsensusGroupId groupId, AsyncMethodCallback<Boolean> resultHandler)
+ throws TException {
RaftMember member = getMember(groupId);
resultHandler.onComplete(member.matchLog(index, term));
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
similarity index 56%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
index 78ed30db8b..fa232fcbbd 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/protocol/log/CommitLogCallback.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/IOUtils.java
@@ -17,32 +17,25 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.protocol.log;
+package org.apache.iotdb.consensus.natraft.utils;
-import org.apache.iotdb.consensus.natraft.protocol.RaftMember;
-
-import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class CommitLogCallback implements AsyncMethodCallback<Void> {
+@SuppressWarnings("java:S1135")
+public class IOUtils {
- private static final Logger logger = LoggerFactory.getLogger(CommitLogCallback.class);
- private final RaftMember raftMember;
+ private static final Logger logger = LoggerFactory.getLogger(IOUtils.class);
- public CommitLogCallback(RaftMember raftMember) {
- this.raftMember = raftMember;
+ private IOUtils() {
+ // util class
}
- @Override
- public void onComplete(Void v) {
- synchronized (raftMember.getSyncLock()) {
- raftMember.getSyncLock().notifyAll();
+ public static Throwable getRootCause(Throwable e) {
+ Throwable curr = e;
+ while (curr.getCause() != null) {
+ curr = curr.getCause();
}
- }
-
- @Override
- public void onError(Exception e) {
- logger.error("async commit log failed", e);
+ return curr;
}
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/Response.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Response.java
similarity index 75%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/Response.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Response.java
index a3a592ef03..a75ce1897d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/Response.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/Response.java
@@ -1,8 +1,23 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless req [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.iotdb.consensus.natraft.Utils;
+package org.apache.iotdb.consensus.natraft.utils;
/**
* Response defines the numeric responses that have special meanings. Enum class is not used for
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/StatusUtils.java b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
similarity index 98%
rename from consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/StatusUtils.java
rename to consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
index 7dd02d03e1..d7f0323627 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/natraft/Utils/StatusUtils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/natraft/utils/StatusUtils.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.consensus.natraft.Utils;
+package org.apache.iotdb.consensus.natraft.utils;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
index 4c38b6c6f9..777dec7866 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/ReplicateTest.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.consensus.raft;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.DataRegionId;
@@ -28,9 +26,9 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.natraft.RaftConsensus;
import org.apache.iotdb.consensus.raft.util.TestEntry;
import org.apache.iotdb.consensus.raft.util.TestStateMachine;
-import org.apache.iotdb.consensus.natraft.RaftConsensus;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/FakeDataSet.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/FakeDataSet.java
index fd87f0cce0..f16415dfe0 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/FakeDataSet.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/FakeDataSet.java
@@ -1,14 +1,30 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless req [...]
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
package org.apache.iotdb.consensus.raft.util;
-import java.util.Objects;
-import java.util.Set;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import java.util.Objects;
+import java.util.Set;
+
public class FakeDataSet implements DataSet {
private final Set<IConsensusRequest> requestSet;
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestEntry.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestEntry.java
index 838206722f..39d46dae0d 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestEntry.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestEntry.java
@@ -19,13 +19,14 @@
package org.apache.iotdb.consensus.raft.util;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
+import org.apache.iotdb.tsfile.utils.PublicBAOS;
+
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
-import org.apache.iotdb.consensus.common.Peer;
-import org.apache.iotdb.consensus.common.request.IoTConsensusRequest;
-import org.apache.iotdb.tsfile.utils.PublicBAOS;
public class TestEntry extends IoTConsensusRequest {
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
index 8d31df4856..69ae1bd783 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/raft/util/TestStateMachine.java
@@ -19,12 +19,6 @@
package org.apache.iotdb.consensus.raft.util;
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
@@ -32,9 +26,17 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger logger = LoggerFactory.getLogger(TestStateMachine.class);
@@ -44,6 +46,7 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
public Set<IConsensusRequest> getRequestSet() {
return requestSet;
}
+
@Override
public void start() {}
@@ -68,6 +71,7 @@ public class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
byteBuffer.rewind();
return TestEntry.deserialize(byteBuffer);
}
+
@Override
public synchronized DataSet read(IConsensusRequest request) {
return new FakeDataSet(requestSet);