You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/20 01:09:40 UTC
[incubator-ratis] branch master updated: RATIS-1251. Move
StateMachine and TransactionContext to ratis-server-api (#363)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f511964 RATIS-1251. Move StateMachine and TransactionContext to ratis-server-api (#363)
f511964 is described below
commit f51196455679443be438a1ddb04bd6860f4ad6dc
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 20 09:09:30 2020 +0800
RATIS-1251. Move StateMachine and TransactionContext to ratis-server-api (#363)
* RATIS-1251. Move StateMachine and TransactionContext to ratis-server-api.
* Fix a bug.
---
.../java/org/apache/ratis/protocol/ClientId.java | 3 +-
.../org/apache/ratis/util/ReflectionUtils.java | 11 ++++-
.../apache/ratis/statemachine/SnapshotInfo.java | 12 ++----
.../statemachine}/SnapshotRetentionPolicy.java | 10 ++---
.../apache/ratis/statemachine/StateMachine.java | 23 ++++++-----
.../ratis/statemachine/StateMachineStorage.java | 3 +-
.../ratis/statemachine/TransactionContext.java | 40 +++++++++++-------
.../ratis/server/impl/ConfigurationManager.java | 3 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 3 +-
.../org/apache/ratis/server/impl/ServerState.java | 25 ++++-------
.../ratis/server/impl/StateMachineUpdater.java | 2 +-
.../apache/ratis/server/raftlog/LogProtoUtils.java | 13 ++----
.../ratis/statemachine/impl/BaseStateMachine.java | 7 ++++
.../impl/SimpleStateMachineStorage.java | 3 +-
.../statemachine/impl/TransactionContextImpl.java | 48 ++++++++++++----------
.../ratis/server/storage/TestRaftStorage.java | 2 +-
16 files changed, 110 insertions(+), 98 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
index 84f7d72..8c3e4c7 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ClientId.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import java.util.Optional;
import java.util.UUID;
/**
@@ -37,7 +38,7 @@ public final class ClientId extends RaftId {
}
public static ClientId valueOf(ByteString data) {
- return new ClientId(data);
+ return Optional.ofNullable(data).filter(d -> !d.isEmpty()).map(ClientId::new).orElse(EMPTY_CLIENT_ID);
}
public static ClientId valueOf(UUID uuid) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
index 3227ae7..afd3817 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ReflectionUtils.java
@@ -200,8 +200,15 @@ public interface ReflectionUtils {
return false;
}
- static <BASE> Class<? extends BASE> getClass(
- String subClassName, Class<BASE> base) {
+ static String getImplClassName(Class<?> clazz) {
+ return clazz.getPackage().getName() + ".impl." + JavaUtils.getClassSimpleName(clazz) + "Impl";
+ }
+
+ static <BASE> Class<? extends BASE> getImplClass(Class<BASE> base) {
+ return getClass(getImplClassName(base), base);
+ }
+
+ static <BASE> Class<? extends BASE> getClass(String subClassName, Class<BASE> base) {
try {
return getClassByName(subClassName).asSubclass(base);
} catch (ClassNotFoundException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
similarity index 69%
rename from ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
rename to ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
index e8fb288..c17e7e6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotInfo.java
@@ -23,10 +23,9 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
/**
- * SnapshotInfo represents a durable state by the state machine. The state machine implementation is
- * responsible for the layout of the snapshot files as well as making the data durable. Latest term,
- * latest index, and the raft configuration must be saved together with any data files in the
- * snapshot.
+ * The information of a state machine snapshot,
+ * where a snapshot captures the states at a particular {@link TermIndex}.
+ * Each state machine implementation must define its snapshot format and persist snapshots in a durable storage.
*/
public interface SnapshotInfo {
@@ -50,10 +49,7 @@ public interface SnapshotInfo {
}
/**
- * Returns a list of files corresponding to this snapshot. This list should include all
- * the files that the state machine keeps in its data directory. This list of files will be
- * copied as to other replicas in install snapshot RPCs.
- * @return a list of Files corresponding to the this snapshot.
+ * @return a list of underlying files of this snapshot.
*/
List<FileInfo> getFiles();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotRetentionPolicy.java
similarity index 76%
rename from ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
rename to ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotRetentionPolicy.java
index 30e81b3..adfb521 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SnapshotRetentionPolicy.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/SnapshotRetentionPolicy.java
@@ -15,20 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.ratis.statemachine.impl;
+package org.apache.ratis.statemachine;
/**
- * Policy for retention of Ratis snapshot files.
+ * Retention policy of state machine snapshots.
*/
public interface SnapshotRetentionPolicy {
-
int DEFAULT_ALL_SNAPSHOTS_RETAINED = -1;
/**
- * If a retention policy is configured, get the number of recent snapshots to
- * retain. Default is -1, which means Ratis will retain ALL old snapshots.
- * @return number of recent snapshots to retain.
+ * @return -1 for retaining all the snapshots; otherwise, return the number of snapshots to be retained.
*/
default int getNumSnapshotsRetained() {
return DEFAULT_ALL_SNAPSHOTS_RETAINED;
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
similarity index 95%
rename from ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
rename to ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index c454392..7d1250e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -18,19 +18,19 @@
package org.apache.ratis.statemachine;
import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
-import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,9 +55,9 @@ public interface StateMachine extends Closeable {
}
/**
- * An optional API for managing data outside the {@link org.apache.ratis.server.raftlog.RaftLog}.
+ * An optional API for managing data outside the raft log.
* For data intensive applications, it can be more efficient to implement this API
- * in order to support zero buffer coping and a light-weighted {@link org.apache.ratis.server.raftlog.RaftLog}.
+ * in order to support zero buffer coping and a light-weighted raft log.
*/
interface DataApi {
/** A noop implementation of {@link DataApi}. */
@@ -328,11 +328,11 @@ public interface StateMachine extends Closeable {
}
/**
- * Initializes the State Machine with the given server, group and storage. The state machine is
- * responsible reading the latest snapshot from the file system (if any) and initialize itself
- * with the latest term and index there including all the edits.
+ * Initializes the State Machine with the given parameter.
+ * The state machine must, if there is any, read the latest snapshot.
*/
- void initialize(RaftServer server, RaftGroupId groupId, RaftStorage storage) throws IOException;
+ //TODO change the raftServer parameter back to RaftServer once RaftServer has been moved to ratis-server-api
+ void initialize(Object raftServer, RaftGroupId raftGroupId, RaftStorage storage) throws IOException;
/**
* Returns the lifecycle state for this StateMachine.
@@ -451,11 +451,12 @@ public interface StateMachine extends Closeable {
TermIndex getLastAppliedTermIndex();
/**
- * Converts the proto object into a useful log string to add information about state machine data.
+ * Converts the given proto to a string.
+ *
* @param proto state machine proto
* @return the string representation of the proto.
*/
default String toStateMachineLogEntryString(RaftProtos.StateMachineLogEntryProto proto) {
- return LogProtoUtils.toStateMachineLogEntryString(proto, null);
+ return JavaUtils.getClassSimpleName(proto.getClass()) + ":" + ClientInvocationId.valueOf(proto);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
similarity index 95%
rename from ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
rename to ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
index b96b654..86ffdf6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachineStorage.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.ratis.statemachine;
import java.io.IOException;
import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
public interface StateMachineStorage {
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
similarity index 82%
rename from ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
rename to ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
index b6ed26b..3821b05 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/TransactionContext.java
@@ -21,10 +21,9 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.server.raftlog.LogProtoUtils;
-import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ReflectionUtils;
import java.io.IOException;
import java.util.Objects;
@@ -94,13 +93,6 @@ public interface TransactionContext {
LogEntryProto initLogEntry(long term, long index);
/**
- * Sets the data from the {@link StateMachine}
- * @param smLogEntryProto data from {@link StateMachine}
- * @return the current {@link TransactionContext} itself
- */
- TransactionContext setStateMachineLogEntryProto(StateMachineLogEntryProto smLogEntryProto);
-
- /**
* Returns the committed log entry
* @return the committed log entry
*/
@@ -199,16 +191,36 @@ public interface TransactionContext {
Preconditions.assertTrue(serverRole == RaftPeerRole.LEADER,
() -> "serverRole MUST be LEADER since clientRequest != null, serverRole is " + serverRole);
Preconditions.assertNull(logEntry, () -> "logEntry MUST be null since clientRequest != null");
- if (stateMachineLogEntry == null) {
- stateMachineLogEntry = LogProtoUtils.toStateMachineLogEntryProto(clientRequest, logData, stateMachineData);
- }
- return new TransactionContextImpl(stateMachine, clientRequest, stateMachineLogEntry, stateMachineContext);
+ return newTransactionContext(stateMachine, clientRequest,
+ stateMachineLogEntry, logData, stateMachineData, stateMachineContext);
} else {
Objects.requireNonNull(logEntry, "logEntry MUST NOT be null since clientRequest == null");
Preconditions.assertTrue(logEntry.hasStateMachineLogEntry(),
() -> "Unexpected logEntry: stateMachineLogEntry not found, logEntry=" + logEntry);
- return new TransactionContextImpl(serverRole, stateMachine, logEntry);
+ return newTransactionContext(serverRole, stateMachine, logEntry);
}
}
+
+ /** Get the impl class using reflection. */
+ private static final Class<? extends TransactionContext> IMPL_CLASS
+ = ReflectionUtils.getImplClass(TransactionContext.class);
+
+ /** @return a new {@link TransactionContext} using reflection. */
+ private static TransactionContext newTransactionContext(
+ StateMachine stateMachine, RaftClientRequest clientRequest,
+ StateMachineLogEntryProto stateMachineLogEntry, ByteString logData, ByteString stateMachineData,
+ Object stateMachineContext) {
+ final Class<?>[] argClasses = {RaftClientRequest.class, StateMachine.class,
+ StateMachineLogEntryProto.class, ByteString.class, ByteString.class, Object.class};
+ return ReflectionUtils.newInstance(IMPL_CLASS, argClasses,
+ clientRequest, stateMachine, stateMachineLogEntry, logData, stateMachineData, stateMachineContext);
+ }
+
+ /** @return a new {@link TransactionContext} using reflection. */
+ private static TransactionContext newTransactionContext(
+ RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) {
+ final Class<?>[] argClasses = {RaftPeerRole.class, StateMachine.class, LogEntryProto.class};
+ return ReflectionUtils.newInstance(IMPL_CLASS, argClasses, serverRole, stateMachine, logEntry);
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
index 019047f..ed51a65 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ConfigurationManager.java
@@ -44,7 +44,8 @@ public class ConfigurationManager {
this.currentConf = initialConf;
}
- synchronized void addConfiguration(long logIndex, RaftConfiguration conf) {
+ synchronized void addConfiguration(RaftConfiguration conf) {
+ final long logIndex = conf.getLogEntryIndex();
final RaftConfiguration found = configurations.get(logIndex);
if (found != null) {
Preconditions.assertTrue(found.equals(conf));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index ccb033f..c35eeb7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -442,8 +442,9 @@ class LeaderStateImpl implements LeaderState {
}
private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
+ Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
voterLists = divideFollowers(newConf);
- server.getState().setRaftConf(logIndex, newConf);
+ server.getState().setRaftConf(newConf);
}
void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e6b4674..fe5e1f4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -41,7 +41,6 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -109,7 +108,9 @@ class ServerState implements Closeable {
storage = new RaftStorageImpl(dir, RaftServerConfigKeys.Log.corruptionPolicy(prop));
snapshotManager = new SnapshotManager(storage, id);
- initStatemachine(stateMachine, group.getGroupId());
+ stateMachine.initialize(server.getRaftServer(), group.getGroupId(), storage);
+ // read configuration from the storage
+ Optional.ofNullable(storage.readRaftConfiguration()).ifPresent(this::setRaftConf);
// On start the leader is null, start the clock now
leaderId = null;
@@ -154,22 +155,12 @@ class ServerState implements Closeable {
return resultList.get(0);
}
return numberOfStorageDirPerVolume.entrySet().stream()
- .min(Comparator.comparing(Map.Entry::getValue))
+ .min(Map.Entry.comparingByValue())
.map(Map.Entry::getKey)
.map(v -> new File(v, targetSubDir))
.orElseThrow(() -> new IOException("No storage directory found."));
}
- private void initStatemachine(StateMachine sm, RaftGroupId gid)
- throws IOException {
- sm.initialize(server.getRaftServer(), gid, storage);
- // get the raft configuration from raft metafile
- RaftConfiguration raftConf = storage.readRaftConfiguration();
- if (raftConf != null) {
- setRaftConf(raftConf.getLogEntryIndex(), raftConf);
- }
- }
-
void writeRaftConfiguration(LogEntryProto conf) {
storage.writeRaftConfiguration(conf);
}
@@ -364,14 +355,14 @@ class ServerState implements Closeable {
void setRaftConf(LogEntryProto entry) {
if (entry.hasConfigurationEntry()) {
- setRaftConf(entry.getIndex(), LogProtoUtils.toRaftConfiguration(entry));
+ setRaftConf(LogProtoUtils.toRaftConfiguration(entry));
}
}
- void setRaftConf(long logIndex, RaftConfiguration conf) {
- configurationManager.addConfiguration(logIndex, conf);
+ void setRaftConf(RaftConfiguration conf) {
+ configurationManager.addConfiguration(conf);
server.getServerRpc().addRaftPeers(conf.getAllPeers());
- LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
+ LOG.info("{}: set configuration {}", getMemberId(), conf);
LOG.trace("{}: {}", getMemberId(), configurationManager);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index e0c7f66..b8a1405 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -29,7 +29,7 @@ import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
index 73b3667..22d206c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.raftlog;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.RaftConfiguration;
@@ -38,21 +39,15 @@ import java.util.stream.Collectors;
public final class LogProtoUtils {
private LogProtoUtils() {}
- public static String toStateMachineLogEntryString(StateMachineLogEntryProto proto,
- Function<StateMachineLogEntryProto, String> function) {
- final ByteString clientId = proto.getClientId();
- return (clientId.isEmpty() ? "<empty ClientId>" : ClientId.valueOf(clientId))
- + ",cid=" + proto.getCallId()
- + Optional.ofNullable(function).map(f -> f.apply(proto)).map(s -> "," + s).orElse("");
- }
-
public static String toLogEntryString(LogEntryProto entry, Function<StateMachineLogEntryProto, String> function) {
if (entry == null) {
return null;
}
final String s;
if (entry.hasStateMachineLogEntry()) {
- s = ", " + toStateMachineLogEntryString(entry.getStateMachineLogEntry(), function);
+ s = ", " + Optional.ofNullable(function)
+ .orElseGet(() -> proto -> "" + ClientInvocationId.valueOf(proto))
+ .apply(entry.getStateMachineLogEntry());
} else if (entry.hasMetadataEntry()) {
final MetadataProto metadata = entry.getMetadataEntry();
s = "(c:" + metadata.getCommitIndex() + ")";
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index e27fb5b..c086e54 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -29,6 +29,7 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
@@ -82,6 +83,12 @@ public class BaseStateMachine implements StateMachine, StateMachine.DataApi,
}
@Override
+ public void initialize(Object raftServer, RaftGroupId raftGroupId, RaftStorage storage) throws IOException {
+ Preconditions.assertTrue(raftServer instanceof RaftServer,
+ () -> "Unexpected parameter " + raftServer.getClass());
+ initialize((RaftServer) raftServer, raftGroupId, storage);
+ }
+
public void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage storage) throws IOException {
this.groupId = raftGroupId;
this.server.complete(raftServer);
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
index f253728..602879a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,6 +21,7 @@ import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.AtomicFileOutputStream;
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
index c1a72c6..8cedb4a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.Preconditions;
import java.io.IOException;
@@ -41,13 +42,13 @@ public class TransactionContextImpl implements TransactionContext {
private final StateMachine stateMachine;
/** Original request from the client */
- private RaftClientRequest clientRequest;
+ private final RaftClientRequest clientRequest;
/** Exception from the {@link StateMachine} or from the log */
private Exception exception;
/** Data from the {@link StateMachine} */
- private StateMachineLogEntryProto smLogEntryProto;
+ private final StateMachineLogEntryProto stateMachineLogEntry;
/**
* Context specific to the state machine.
@@ -67,9 +68,12 @@ public class TransactionContextImpl implements TransactionContext {
/** Committed LogEntry. */
private LogEntryProto logEntry;
- private TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine) {
+ private TransactionContextImpl(RaftPeerRole serverRole, RaftClientRequest clientRequest, StateMachine stateMachine,
+ StateMachineLogEntryProto stateMachineLogEntry) {
this.serverRole = serverRole;
+ this.clientRequest = clientRequest;
this.stateMachine = stateMachine;
+ this.stateMachineLogEntry = stateMachineLogEntry;
}
/**
@@ -78,25 +82,31 @@ public class TransactionContextImpl implements TransactionContext {
* and send the Log entry representing the transaction data
* to be applied to the raft log.
*/
- public TransactionContextImpl(
- StateMachine stateMachine, RaftClientRequest clientRequest,
- StateMachineLogEntryProto smLogEntryProto, Object stateMachineContext) {
- this(RaftPeerRole.LEADER, stateMachine);
- this.clientRequest = clientRequest;
- this.smLogEntryProto = smLogEntryProto != null? smLogEntryProto
- : LogProtoUtils.toStateMachineLogEntryProto(clientRequest, null, null);
+ TransactionContextImpl(RaftClientRequest clientRequest, StateMachine stateMachine,
+ StateMachineLogEntryProto stateMachineLogEntry, ByteString logData, ByteString stateMachineData,
+ Object stateMachineContext) {
+ this(RaftPeerRole.LEADER, clientRequest, stateMachine,
+ get(stateMachineLogEntry, clientRequest, logData, stateMachineData));
this.stateMachineContext = stateMachineContext;
}
+ private static StateMachineLogEntryProto get(StateMachineLogEntryProto stateMachineLogEntry,
+ RaftClientRequest clientRequest, ByteString logData, ByteString stateMachineData) {
+ if (stateMachineLogEntry != null) {
+ return stateMachineLogEntry;
+ } else {
+ return LogProtoUtils.toStateMachineLogEntryProto(clientRequest, logData, stateMachineData);
+ }
+ }
+
/**
* Construct a {@link TransactionContext} from a {@link LogEntryProto}.
* Used by followers for applying committed entries to the state machine.
* @param logEntry the log entry to be applied
*/
- public TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) {
- this(serverRole, stateMachine);
+ TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) {
+ this(serverRole, null, stateMachine, logEntry.getStateMachineLogEntry());
this.logEntry = logEntry;
- this.smLogEntryProto = logEntry.getStateMachineLogEntry();
}
@Override
@@ -111,7 +121,7 @@ public class TransactionContextImpl implements TransactionContext {
@Override
public StateMachineLogEntryProto getStateMachineLogEntry() {
- return smLogEntryProto;
+ return stateMachineLogEntry;
}
@Override
@@ -134,14 +144,8 @@ public class TransactionContextImpl implements TransactionContext {
public LogEntryProto initLogEntry(long term, long index) {
Preconditions.assertTrue(serverRole == RaftPeerRole.LEADER);
Preconditions.assertNull(logEntry, "logEntry");
- Objects.requireNonNull(smLogEntryProto, "smLogEntryProto == null");
- return logEntry = LogProtoUtils.toLogEntryProto(smLogEntryProto, term, index);
- }
-
- @Override
- public TransactionContext setStateMachineLogEntryProto(StateMachineLogEntryProto logEntryProto) {
- this.smLogEntryProto = logEntryProto;
- return this;
+ Objects.requireNonNull(stateMachineLogEntry, "stateMachineLogEntry == null");
+ return logEntry = LogProtoUtils.toLogEntryProto(stateMachineLogEntry, term, index);
}
@Override
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
index 64410d8..8cc2eea 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java
@@ -24,7 +24,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorageDirectoryImpl.StorageState;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SnapshotRetentionPolicy;
+import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
import org.junit.Assert;