You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by wa...@apache.org on 2022/05/06 02:18:56 UTC
[iotdb] branch master updated: [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
This is an automated email from the ASF dual-hosted git repository.
wangchao316 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f9b3b0f8d2 [IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
f9b3b0f8d2 is described below
commit f9b3b0f8d2b8a8ce07b65a37aacfb2b16ed003a7
Author: Potato <TX...@gmail.com>
AuthorDate: Fri May 6 10:18:52 2022 +0800
[IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
[IOTDB-3104] Add Consensus Module StateMachine Event API (#5806)
---
.../statemachine/PartitionRegionStateMachine.java | 2 +-
.../org/apache/iotdb/consensus/IStateMachine.java | 40 +++++++++++++++-
.../org/apache/iotdb/consensus/common/Peer.java | 5 ++
.../ratis/ApplicationStateMachineProxy.java | 36 +++++++++++++--
.../iotdb/consensus/ratis/RatisConsensus.java | 53 ++++++++++++----------
.../org/apache/iotdb/consensus/ratis/Utils.java | 47 +++++++++++++------
.../apache/iotdb/consensus/EmptyStateMachine.java | 2 +-
.../iotdb/consensus/ratis/RatisConsensusTest.java | 13 +++++-
.../apache/iotdb/consensus/ratis/SnapshotTest.java | 2 +-
.../apache/iotdb/consensus/ratis/TestUtils.java | 37 ++++++++++++++-
.../apache/iotdb/consensus/ratis/UtilsTest.java | 4 +-
.../standalone/StandAloneConsensusTest.java | 2 +-
.../consensus/statemachine/BaseStateMachine.java | 2 +-
13 files changed, 194 insertions(+), 51 deletions(-)
diff --git a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index cb5048070a..0eb69d74e4 100644
--- a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -36,7 +36,7 @@ import java.io.File;
import java.io.IOException;
/** Statemachine for PartitionRegion */
-public class PartitionRegionStateMachine implements IStateMachine {
+public class PartitionRegionStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionRegionStateMachine.class);
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
index e12622f2ad..203b7d2a6b 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java
@@ -19,14 +19,17 @@
package org.apache.iotdb.consensus;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import javax.annotation.concurrent.ThreadSafe;
import java.io.File;
+import java.util.List;
import java.util.function.Function;
@ThreadSafe
@@ -54,7 +57,7 @@ public interface IStateMachine {
/**
* Take a snapshot of current statemachine. All files are required to be stored under snapshotDir,
- * which is a sub-directory of the StorageDir in Consensus
+ * which is a subdirectory of the StorageDir in Consensus
*
* @param snapshotDir required storage dir
* @return true if snapshot is successfully taken
@@ -67,4 +70,39 @@ public interface IStateMachine {
* @param latestSnapshotRootDir dir where the latest snapshot sits
*/
void loadSnapshot(File latestSnapshotRootDir);
+
+ /** An optional API for event notifications. */
+ interface EventApi {
+ /**
+ * Notify the {@link IStateMachine} that a new leader has been elected. Note that the new leader
+ * can possibly be this server.
+ *
+ * @param groupId The id of this consensus group.
+ * @param newLeader The id of the new leader.
+ */
+ default void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint newLeader) {}
+
+ /**
+ * Notify the {@link IStateMachine} a configuration change. This method will be invoked when a
+ * newConfiguration is processed.
+ *
+ * @param term term of the current log entry
+ * @param index index which is being updated
+ * @param newConfiguration new configuration
+ */
+ default void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {}
+ }
+
+ /**
+ * Get the {@link IStateMachine.EventApi} object.
+ *
+ * <p>If this {@link IStateMachine} chooses to support the optional {@link
+ * IStateMachine.EventApi}, it may either implement {@link IStateMachine.EventApi} directly or
+ * override this method to return an {@link IStateMachine.EventApi} object.
+ *
+ * @return The {@link IStateMachine.EventApi} object.
+ */
+ default IStateMachine.EventApi event() {
+ return (IStateMachine.EventApi) this;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
index db537e9022..99379156fb 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -59,4 +59,9 @@ public class Peer {
public int hashCode() {
return Objects.hash(groupId, endpoint);
}
+
+ @Override
+ public String toString() {
+ return "Peer{" + "groupId=" + groupId + ", endpoint=" + endpoint + '}';
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index 123bd4f13d..3617a36b0c 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -25,8 +25,11 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -49,11 +52,13 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
// Raft Storage sub dir for statemachine data, default (_sm)
private File statemachineDir;
private final SnapshotStorage snapshotStorage;
+ private final RaftGroupId groupId;
- public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+ public ApplicationStateMachineProxy(IStateMachine stateMachine, RaftGroupId id) {
applicationStateMachine = stateMachine;
snapshotStorage = new SnapshotStorage(applicationStateMachine);
applicationStateMachine.start();
+ groupId = id;
}
@Override
@@ -69,7 +74,7 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
}
@Override
- public void reinitialize() throws IOException {
+ public void reinitialize() {
setLastAppliedTermIndex(null);
loadSnapshot(snapshotStorage.findLatestSnapshotDir());
if (getLifeCycleState() == LifeCycle.State.PAUSED) {
@@ -136,11 +141,15 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
// require the application statemachine to take the latest snapshot
String metadata = Utils.getMetadataFromTermIndex(lastApplied);
File snapshotDir = snapshotStorage.getSnapshotDir(metadata);
+ snapshotDir.mkdir();
+ if (!snapshotDir.isDirectory()) {
+ logger.error("Unable to create snapshotDir at {}", snapshotDir);
+ return RaftLog.INVALID_LOG_INDEX;
+ }
boolean success = applicationStateMachine.takeSnapshot(snapshotDir);
if (!success) {
return RaftLog.INVALID_LOG_INDEX;
}
-
return lastApplied.getIndex();
}
@@ -159,4 +168,25 @@ public class ApplicationStateMachineProxy extends BaseStateMachine {
public StateMachineStorage getStateMachineStorage() {
return snapshotStorage;
}
+
+ @Override
+ public void notifyLeaderChanged(RaftGroupMemberId groupMemberId, RaftPeerId newLeaderId) {
+ applicationStateMachine
+ .event()
+ .notifyLeaderChanged(
+ Utils.fromRaftGroupIdToConsensusGroupId(groupMemberId.getGroupId()),
+ Utils.formRaftPeerIdToTEndPoint(newLeaderId));
+ }
+
+ @Override
+ public void notifyConfigurationChanged(
+ long term, long index, RaftConfigurationProto newRaftConfiguration) {
+ applicationStateMachine
+ .event()
+ .notifyConfigurationChanged(
+ term,
+ index,
+ Utils.fromRaftProtoListAndRaftGroupIdToPeers(
+ newRaftConfiguration.getPeersList(), groupId));
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 96b3b766e3..8d644bbc81 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.client.ClientPoolProperty;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.IClientPoolFactory;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
@@ -109,7 +110,7 @@ class RatisConsensus implements IConsensus {
public RatisConsensus(TEndPoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
throws IOException {
String address = Utils.IPAddress(endpoint);
- myself = Utils.toRaftPeer(endpoint, DEFAULT_PRIORITY);
+ myself = Utils.fromTEndPointAndPriorityToRaftPeer(endpoint, DEFAULT_PRIORITY);
RaftServerConfigKeys.setStorageDir(properties, Collections.singletonList(ratisStorageDir));
RaftServerConfigKeys.Snapshot.setAutoTriggerEnabled(properties, true);
@@ -126,7 +127,8 @@ class RatisConsensus implements IConsensus {
.setStateMachineRegistry(
raftGroupId ->
new ApplicationStateMachineProxy(
- registry.apply(Utils.toConsensusGroupId(raftGroupId))))
+ registry.apply(Utils.fromRaftGroupIdToConsensusGroupId(raftGroupId)),
+ raftGroupId))
.build();
}
@@ -150,7 +152,7 @@ class RatisConsensus implements IConsensus {
ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
// pre-condition: group exists and myself server serves this group
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup raftGroup = getGroupInfo(raftGroupId);
if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
return failedWrite(new ConsensusGroupNotExistException(groupId));
@@ -198,7 +200,7 @@ class RatisConsensus implements IConsensus {
}
if (suggestedLeader != null) {
- TEndPoint leaderEndPoint = Utils.getEndpoint(suggestedLeader);
+ TEndPoint leaderEndPoint = Utils.formRaftPeerIdToTEndPoint(suggestedLeader.getId());
writeResult.setRedirectNode(new TEndPoint(leaderEndPoint.getIp(), leaderEndPoint.getPort()));
}
@@ -209,7 +211,7 @@ class RatisConsensus implements IConsensus {
@Override
public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
- RaftGroup group = getGroupInfo(Utils.toRatisGroupId(groupId));
+ RaftGroup group = getGroupInfo(Utils.fromConsensusGroupIdToRaftGroupId(groupId));
if (group == null || !group.getPeers().contains(myself)) {
return failedRead(new ConsensusGroupNotExistException(groupId));
}
@@ -278,7 +280,7 @@ class RatisConsensus implements IConsensus {
*/
@Override
public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup raftGroup = getGroupInfo(raftGroupId);
// pre-conditions: group exists and myself in this group
@@ -318,9 +320,9 @@ class RatisConsensus implements IConsensus {
*/
@Override
public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup group = getGroupInfo(raftGroupId);
- RaftPeer peerToAdd = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
+ RaftPeer peerToAdd = Utils.fromTEndPointAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -353,9 +355,9 @@ class RatisConsensus implements IConsensus {
*/
@Override
public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup group = getGroupInfo(raftGroupId);
- RaftPeer peerToRemove = Utils.toRaftPeer(peer, DEFAULT_PRIORITY);
+ RaftPeer peerToRemove = Utils.fromTEndPointAndPriorityToRaftPeer(peer, DEFAULT_PRIORITY);
// pre-conditions: group exists and myself in this group
if (group == null || !group.getPeers().contains(myself)) {
@@ -413,14 +415,14 @@ class RatisConsensus implements IConsensus {
// first fetch the newest information
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftGroup raftGroup = getGroupInfo(raftGroupId);
if (raftGroup == null) {
return failed(new ConsensusGroupNotExistException(groupId));
}
- RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader, LEADER_PRIORITY);
+ RaftPeer newRaftLeader = Utils.fromTEndPointAndPriorityToRaftPeer(newLeader, LEADER_PRIORITY);
ArrayList<RaftPeer> newConfiguration = new ArrayList<>();
for (RaftPeer raftPeer : raftGroup.getPeers()) {
@@ -428,7 +430,9 @@ class RatisConsensus implements IConsensus {
newConfiguration.add(newRaftLeader);
} else {
// degrade every other peer to default priority
- newConfiguration.add(Utils.toRaftPeer(Utils.getEndpoint(raftPeer), DEFAULT_PRIORITY));
+ newConfiguration.add(
+ Utils.fromTEndPointAndPriorityToRaftPeer(
+ Utils.formRaftPeerIdToTEndPoint(raftPeer.getId()), DEFAULT_PRIORITY));
}
}
@@ -459,7 +463,7 @@ class RatisConsensus implements IConsensus {
@Override
public boolean isLeader(ConsensusGroupId groupId) {
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
boolean isLeader;
try {
@@ -475,10 +479,10 @@ class RatisConsensus implements IConsensus {
@Override
public Peer getLeader(ConsensusGroupId groupId) {
if (isLeader(groupId)) {
- return new Peer(groupId, Utils.parseFromRatisId(myself.getId().toString()));
+ return new Peer(groupId, Utils.formRaftPeerIdToTEndPoint(myself.getId()));
}
- RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+ RaftGroupId raftGroupId = Utils.fromConsensusGroupIdToRaftGroupId(groupId);
RaftClient client;
try {
client = server.getDivision(raftGroupId).getRaftClient();
@@ -486,7 +490,7 @@ class RatisConsensus implements IConsensus {
logger.warn("cannot find raft client for group " + groupId);
return null;
}
- TEndPoint leaderEndpoint = Utils.parseFromRatisId(client.getLeaderId().toString());
+ TEndPoint leaderEndpoint = Utils.formRaftPeerIdToTEndPoint(client.getLeaderId());
return new Peer(groupId, leaderEndpoint);
}
@@ -513,7 +517,7 @@ class RatisConsensus implements IConsensus {
.setServerId(server.getId())
.setClientId(localFakeId)
.setCallId(localFakeCallId.incrementAndGet())
- .setGroupId(Utils.toRatisGroupId(groupId))
+ .setGroupId(Utils.fromConsensusGroupIdToRaftGroupId(groupId))
.setType(type)
.setMessage(message)
.build();
@@ -536,11 +540,9 @@ class RatisConsensus implements IConsensus {
}
private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
- List<RaftPeer> raftPeers =
- peers.stream()
- .map(peer -> Utils.toRaftPeer(peer, DEFAULT_PRIORITY))
- .collect(Collectors.toList());
- return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+ return RaftGroup.valueOf(
+ Utils.fromConsensusGroupIdToRaftGroupId(groupId),
+ Utils.fromPeersAndPriorityToRaftPeers(peers, DEFAULT_PRIORITY));
}
private RatisClient getRaftClient(RaftGroup group) throws IOException {
@@ -575,6 +577,11 @@ class RatisConsensus implements IConsensus {
return reply;
}
+ @TestOnly
+ public RaftServer getServer() {
+ return server;
+ }
+
private class RatisClientPoolFactory implements IClientPoolFactory<RaftGroup, RatisClient> {
@Override
public KeyedObjectPool<RaftGroup, RatisClient> createClientPool(
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
index 0c3e5d984f..e1773c7f80 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.thrift.TException;
@@ -33,6 +35,8 @@ import org.apache.thrift.transport.TByteBuffer;
import java.io.File;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
public class Utils {
private static final int tempBufferSize = 1024;
@@ -42,7 +46,7 @@ public class Utils {
return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
}
- /** Encode the ConsensusGroupId into 6 bytes 2 Bytes for Group Type 4 Bytes for Group ID */
+ /** Encode the ConsensusGroupId into 6 bytes: 2 Bytes for Group Type and 4 Bytes for Group ID */
public static long groupEncode(ConsensusGroupId consensusGroupId) {
// use abbreviations to prevent overflow
long groupType = consensusGroupId.getType().getValue();
@@ -51,36 +55,49 @@ public class Utils {
return groupCode;
}
- public static String RatisPeerId(TEndPoint endpoint) {
- return String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+ public static RaftPeerId fromTEndPointToRaftPeerId(TEndPoint endpoint) {
+ return RaftPeerId.valueOf(String.format("%s-%d", endpoint.getIp(), endpoint.getPort()));
}
- public static TEndPoint parseFromRatisId(String ratisId) {
- String[] items = ratisId.split("-");
+ public static TEndPoint formRaftPeerIdToTEndPoint(RaftPeerId id) {
+ String[] items = id.toString().split("-");
+ return new TEndPoint(items[0], Integer.parseInt(items[1]));
+ }
+
+ public static TEndPoint formRaftPeerProtoToTEndPoint(RaftPeerProto proto) {
+ String[] items = proto.getAddress().split(":");
return new TEndPoint(items[0], Integer.parseInt(items[1]));
}
// priority is used as ordinal of leader election
- public static RaftPeer toRaftPeer(TEndPoint endpoint, int priority) {
+ public static RaftPeer fromTEndPointAndPriorityToRaftPeer(TEndPoint endpoint, int priority) {
return RaftPeer.newBuilder()
- .setId(RatisPeerId(endpoint))
+ .setId(fromTEndPointToRaftPeerId(endpoint))
.setAddress(IPAddress(endpoint))
.setPriority(priority)
.build();
}
- public static RaftPeer toRaftPeer(Peer peer, int priority) {
- return toRaftPeer(peer.getEndpoint(), priority);
+ public static RaftPeer fromTEndPointAndPriorityToRaftPeer(Peer peer, int priority) {
+ return fromTEndPointAndPriorityToRaftPeer(peer.getEndpoint(), priority);
+ }
+
+ public static List<RaftPeer> fromPeersAndPriorityToRaftPeers(List<Peer> peers, int priority) {
+ return peers.stream()
+ .map(peer -> Utils.fromTEndPointAndPriorityToRaftPeer(peer, priority))
+ .collect(Collectors.toList());
}
- public static TEndPoint getEndpoint(RaftPeer raftPeer) {
- String address = raftPeer.getAddress(); // ip:port
- String[] split = address.split(":");
- return new TEndPoint(split[0], Integer.parseInt(split[1]));
+ public static List<Peer> fromRaftProtoListAndRaftGroupIdToPeers(
+ List<RaftPeerProto> raftProtoList, RaftGroupId id) {
+ ConsensusGroupId consensusGroupId = Utils.fromRaftGroupIdToConsensusGroupId(id);
+ return raftProtoList.stream()
+ .map(peer -> new Peer(consensusGroupId, Utils.formRaftPeerProtoToTEndPoint(peer)))
+ .collect(Collectors.toList());
}
/** Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme: */
- public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+ public static RaftGroupId fromConsensusGroupIdToRaftGroupId(ConsensusGroupId consensusGroupId) {
long groupCode = groupEncode(consensusGroupId);
byte[] bGroupCode = ByteBuffer.allocate(Long.BYTES).putLong(groupCode).array();
byte[] bPaddedGroupName = new byte[16];
@@ -93,7 +110,7 @@ public class Utils {
}
/** Given raftGroupId, decrypt ConsensusGroupId out of it */
- public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+ public static ConsensusGroupId fromRaftGroupIdToConsensusGroupId(RaftGroupId raftGroupId) {
byte[] padded = raftGroupId.toByteString().toByteArray();
long type = (padded[10] << 8) + padded[11];
ByteBuffer byteBuffer = ByteBuffer.allocate(Integer.BYTES);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
index e1ea8fa1d4..01992f472f 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import java.io.File;
-public class EmptyStateMachine implements IStateMachine {
+public class EmptyStateMachine implements IStateMachine, IStateMachine.EventApi {
@Override
public void start() {}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index 2b3e952823..4eee289586 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -54,6 +54,7 @@ public class RatisConsensusTest {
private List<Peer> peers;
private List<File> peersStorage;
private List<IConsensus> servers;
+ private List<TestUtils.IntegerCounter> stateMachines;
private ConsensusGroup group;
private Peer peer0;
private Peer peer1;
@@ -62,12 +63,14 @@ public class RatisConsensusTest {
private void makeServers() throws IOException {
for (int i = 0; i < 3; i++) {
+ stateMachines.add(new TestUtils.IntegerCounter());
+ int finalI = i;
servers.add(
ConsensusFactory.getConsensusImpl(
RATIS_CLASS_NAME,
peers.get(i).getEndpoint(),
peersStorage.get(i),
- groupId -> new TestUtils.IntegerCounter())
+ groupId -> stateMachines.get(finalI))
.orElseThrow(
() ->
new IllegalArgumentException(
@@ -95,6 +98,7 @@ public class RatisConsensusTest {
}
group = new ConsensusGroup(gid, peers);
servers = new ArrayList<>();
+ stateMachines = new ArrayList<>();
makeServers();
}
@@ -130,6 +134,9 @@ public class RatisConsensusTest {
servers.get(0).removeConsensusGroup(gid);
servers.get(2).removeConsensusGroup(gid);
Assert.assertEquals(servers.get(1).getLeader(gid).getEndpoint(), peers.get(1).getEndpoint());
+ Assert.assertEquals(stateMachines.get(1).getLeaderEndpoint(), peers.get(1).getEndpoint());
+ Assert.assertEquals(stateMachines.get(1).getConfiguration().size(), 1);
+ Assert.assertEquals(stateMachines.get(1).getConfiguration().get(0), peers.get(1));
// 4. try consensus again with one peer
doConsensus(servers.get(1), gid, 10, 20);
@@ -141,6 +148,7 @@ public class RatisConsensusTest {
// then use addPeer to inform the group leader of configuration change
servers.get(1).addPeer(gid, peer0);
servers.get(1).addPeer(gid, peer2);
+ Assert.assertEquals(stateMachines.get(1).getConfiguration().size(), 3);
// 6. try consensus with all 3 peers
doConsensus(servers.get(2), gid, 10, 30);
@@ -150,6 +158,9 @@ public class RatisConsensusTest {
servers.get(0).changePeer(group.getGroupId(), Collections.singletonList(peer0));
servers.get(1).removeConsensusGroup(group.getGroupId());
servers.get(2).removeConsensusGroup(group.getGroupId());
+ Assert.assertEquals(stateMachines.get(0).getLeaderEndpoint(), peers.get(0).getEndpoint());
+ Assert.assertEquals(stateMachines.get(0).getConfiguration().size(), 1);
+ Assert.assertEquals(stateMachines.get(0).getConfiguration().get(0), peers.get(0));
// 8. try consensus with only peer0
doConsensus(servers.get(0), gid, 10, 40);
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
index 3af8c5df5e..e42f981706 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/SnapshotTest.java
@@ -87,7 +87,7 @@ public class SnapshotTest {
@Test
public void testSnapshot() throws Exception {
ApplicationStateMachineProxy proxy =
- new ApplicationStateMachineProxy(new TestUtils.IntegerCounter());
+ new ApplicationStateMachineProxy(new TestUtils.IntegerCounter(), null);
proxy.initialize(null, null, new EmptyStorageWithOnlySMDir());
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
index eca57c4a19..0e36ea878e 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java
@@ -18,10 +18,13 @@
*/
package org.apache.iotdb.consensus.ratis;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.consensus.IStateMachine;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
@@ -33,6 +36,7 @@ import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicInteger;
@@ -61,9 +65,11 @@ public class TestUtils {
}
}
- static class IntegerCounter implements IStateMachine {
+ static class IntegerCounter implements IStateMachine, IStateMachine.EventApi {
private AtomicInteger integer;
private final Logger logger = LoggerFactory.getLogger(IntegerCounter.class);
+ private TEndPoint leaderEndpoint;
+ private List<Peer> configuration;
@Override
public void start() {
@@ -113,6 +119,27 @@ public class TestUtils {
}
}
+ @Override
+ public void notifyLeaderChanged(ConsensusGroupId groupId, TEndPoint newLeader) {
+ this.leaderEndpoint = newLeader;
+ System.out.println("---------newLeader-----------");
+ System.out.println(groupId);
+ System.out.println(newLeader);
+ System.out.println("----------------------");
+ }
+
+ @Override
+ public void notifyConfigurationChanged(long term, long index, List<Peer> newConfiguration) {
+ this.configuration = newConfiguration;
+ System.out.println("----------newConfiguration------------");
+ System.out.println("term : " + term);
+ System.out.println("index : " + index);
+ for (Peer peer : newConfiguration) {
+ System.out.println(peer);
+ }
+ System.out.println("----------------------");
+ }
+
@TestOnly
public static synchronized String ensureSnapshotFileName(File snapshotDir, String metadata) {
File dir = new File(snapshotDir + File.separator + metadata);
@@ -121,5 +148,13 @@ public class TestUtils {
}
return dir.getPath() + File.separator + "snapshot";
}
+
+ public TEndPoint getLeaderEndpoint() {
+ return leaderEndpoint;
+ }
+
+ public List<Peer> getConfiguration() {
+ return configuration;
+ }
}
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
index 9d11b7967e..794c23289a 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/UtilsTest.java
@@ -29,8 +29,8 @@ public class UtilsTest {
@Test
public void testEncryption() {
ConsensusGroupId raw = new PartitionRegionId(100);
- RaftGroupId id = Utils.toRatisGroupId(raw);
- ConsensusGroupId cgid = Utils.toConsensusGroupId(id);
+ RaftGroupId id = Utils.fromConsensusGroupIdToRaftGroupId(raw);
+ ConsensusGroupId cgid = Utils.fromRaftGroupIdToConsensusGroupId(id);
Assert.assertEquals(raw.getId(), cgid.getId());
Assert.assertEquals(raw.getType(), cgid.getType());
}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5e814f7151..776257e2f8 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -82,7 +82,7 @@ public class StandAloneConsensusTest {
}
}
- private static class TestStateMachine implements IStateMachine {
+ private static class TestStateMachine implements IStateMachine, IStateMachine.EventApi {
private final boolean direction;
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
index 2298d69272..e46ad09a99 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class BaseStateMachine implements IStateMachine {
+public abstract class BaseStateMachine implements IStateMachine, IStateMachine.EventApi {
private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);