You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/06/07 13:24:44 UTC
incubator-ratis git commit: RATIS-240. Move stateMachine from
RaftServerProxy to RaftServerImpl.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 038ef310c -> c3407a220
RATIS-240. Move stateMachine from RaftServerProxy to RaftServerImpl.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c3407a22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c3407a22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c3407a22
Branch: refs/heads/master
Commit: c3407a2206e5f9acac20efbc6998ae19164df5ce
Parents: 038ef31
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Jun 7 21:23:39 2018 +0800
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Jun 7 21:23:39 2018 +0800
----------------------------------------------------------------------
.../org/apache/ratis/grpc/TestRaftServerWithGrpc.java | 12 ++++++------
.../main/java/org/apache/ratis/server/RaftServer.java | 6 ------
.../org/apache/ratis/server/impl/RaftServerImpl.java | 14 +++++++++-----
.../org/apache/ratis/server/impl/RaftServerProxy.java | 13 ++++---------
.../org/apache/ratis/server/impl/ServerImplUtils.java | 9 ++++++++-
.../org/apache/ratis/statemachine/StateMachine.java | 6 ++++++
.../test/java/org/apache/ratis/RaftAsyncTests.java | 4 ++--
.../apache/ratis/server/impl/RaftServerTestUtil.java | 11 -----------
.../ratis/statemachine/RaftSnapshotBaseTest.java | 4 ++--
.../statemachine/SimpleStateMachine4Testing.java | 4 ++--
.../apache/ratis/statemachine/TestStateMachine.java | 7 +++----
11 files changed, 42 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index cc8c8fb..e5e95ee 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -21,7 +21,8 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.StateMachine;
import org.junit.Test;
import java.io.IOException;
@@ -40,8 +41,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
// Create a raft server proxy with server rpc bound to a different address
// compared to leader. This helps in locking the raft storage directory to
// be used by next raft server proxy instance.
- RaftServerTestUtil.getRaftServerProxy(leaderId, cluster.getLeader().getStateMachine(), cluster.getGroup(),
- new RaftProperties(), null);
+ final StateMachine stateMachine = cluster.getLeader().getStateMachine();
+ ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null);
// Close the server rpc for leader so that new raft server can be bound to it.
cluster.getLeader().getServerRpc().close();
@@ -50,9 +51,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
// the raft server proxy created earlier. Raft server proxy should close
// the rpc server on failure.
testFailureCase("start a new server with the same address",
- () -> RaftServerTestUtil.getRaftServerProxy(leaderId, cluster.getLeader().getStateMachine(),
- cluster.getGroup(), properties, null),
- IOException.class, OverlappingFileLockException.class);
+ () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null),
+ IOException.class, IOException.class, OverlappingFileLockException.class);
// Try to start a raft server rpc at the leader address.
cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 085f2d1..e563705 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -48,12 +48,6 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** Start this server. */
void start();
- /**
- * Returns the StateMachine instance.
- * @return the StateMachine instance.
- */
- StateMachine getStateMachine();
-
/** @return a {@link Builder}. */
static Builder newBuilder() {
return new Builder();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 93666df..5032e88 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -67,6 +67,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
private final RaftServerProxy proxy;
+ private final StateMachine stateMachine;
private final int minTimeoutMs;
private final int maxTimeoutMs;
@@ -90,18 +91,21 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
private final RaftServerJmxAdapter jmxAdapter;
- RaftServerImpl(RaftPeerId id, RaftGroup group, RaftServerProxy proxy,
- RaftProperties properties) throws IOException {
- LOG.debug("new RaftServerImpl {}, {}", id , group);
+ RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
+ final RaftPeerId id = proxy.getId();
+ LOG.debug("{}: new RaftServerImpl for {}", id, group);
this.groupId = group.getGroupId();
this.lifeCycle = new LifeCycle(id);
+ this.stateMachine = stateMachine;
+
+ final RaftProperties properties = proxy.getProperties();
minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS);
maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
"max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
this.proxy = proxy;
- this.state = new ServerState(id, group, properties, this, proxy.getStateMachine());
+ this.state = new ServerState(id, group, properties, this, stateMachine);
this.retryCache = initRetryCache(properties);
this.jmxAdapter = new RaftServerJmxAdapter();
@@ -142,7 +146,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
public StateMachine getStateMachine() {
- return proxy.getStateMachine();
+ return stateMachine;
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 5f1717a..92ed370 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -42,8 +42,8 @@ public class RaftServerProxy implements RaftServer {
public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class);
private final RaftPeerId id;
- private final StateMachine stateMachine;
private final RaftProperties properties;
+ private final StateMachine.Registry stateMachineRegistry;
private final RaftServerRpc serverRpc;
private final ServerFactory factory;
@@ -51,11 +51,11 @@ public class RaftServerProxy implements RaftServer {
private volatile CompletableFuture<RaftServerImpl> impl;
private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
- RaftServerProxy(RaftPeerId id, StateMachine stateMachine,
+ RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
RaftGroup group, RaftProperties properties, Parameters parameters)
throws IOException {
this.properties = properties;
- this.stateMachine = stateMachine;
+ this.stateMachineRegistry = stateMachineRegistry;
final RpcType rpcType = RaftConfigKeys.Rpc.type(properties);
this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
@@ -78,7 +78,7 @@ public class RaftServerProxy implements RaftServer {
}
private RaftServerImpl initImpl(RaftGroup group) throws IOException {
- return new RaftServerImpl(id, group, this, properties);
+ return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
}
private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -108,11 +108,6 @@ public class RaftServerProxy implements RaftServer {
}
@Override
- public StateMachine getStateMachine() {
- return stateMachine;
- }
-
- @Override
public RaftProperties getProperties() {
return properties;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 15ee155..d9e0ee9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -30,14 +30,21 @@ import java.io.IOException;
/** Server utilities for internal use. */
public class ServerImplUtils {
+ /** For the case that all {@link RaftServerImpl} objects share the same {@link StateMachine}. */
public static RaftServerProxy newRaftServer(
RaftPeerId id, RaftGroup group, StateMachine stateMachine,
RaftProperties properties, Parameters parameters) throws IOException {
+ return newRaftServer(id, group, gid -> stateMachine, properties, parameters);
+ }
+
+ public static RaftServerProxy newRaftServer(
+ RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
+ RaftProperties properties, Parameters parameters) throws IOException {
final RaftServerProxy proxy;
try {
// attempt multiple times to avoid temporary bind exception
proxy = JavaUtils.attempt(
- () -> new RaftServerProxy(id, stateMachine, group, properties, parameters),
+ () -> new RaftServerProxy(id, stateMachineRegistry, group, properties, parameters),
5, 500L, "new RaftServerProxy", RaftServerProxy.LOG);
} catch (InterruptedException e) {
throw IOUtils.toInterruptedIOException(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 7d35796..8fa3c90 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -20,6 +20,7 @@ package org.apache.ratis.statemachine;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftConfiguration;
@@ -34,6 +35,7 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
/**
* StateMachine is the entry point for the custom implementation of replicated state as defined in
@@ -43,6 +45,10 @@ import java.util.concurrent.CompletableFuture;
public interface StateMachine extends Closeable {
Logger LOG = LoggerFactory.getLogger(StateMachine.class);
+ /** A registry to support different state machines in multi-raft environment. */
+ interface Registry extends Function<RaftGroupId, StateMachine> {
+ }
+
/**
* Initializes the State Machine with the given properties and storage. The state machine is
* responsible reading the latest snapshot from the file system (if any) and initialize itself
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index a343f76..53e0f1f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -103,7 +103,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
final RaftClient client = cluster.createClient();
//Set blockTransaction flag so that transaction blocks
for (RaftServerProxy server : cluster.getServers()) {
- ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(true);
+ ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(true);
}
//Send numMessages which are blocked and do not release the client semaphore permits
@@ -133,7 +133,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
//Unset the blockTransaction flag so that semaphore permits can be released
for (RaftServerProxy server : cluster.getServers()) {
- ((SimpleStateMachine4Testing) server.getStateMachine()).setBlockTransaction(false);
+ ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(false);
}
for(int i=0; i<=numMessages; i++){
futures[i].join();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index f84576d..a72e6f5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -18,19 +18,13 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Collection;
import java.util.stream.Stream;
@@ -86,11 +80,6 @@ public class RaftServerTestUtil {
return entry.isFailed();
}
- public static RaftServerProxy getRaftServerProxy(RaftPeerId id, StateMachine stateMachine,
- RaftGroup group, RaftProperties properties, Parameters parameters) throws IOException {
- return new RaftServerProxy(id, stateMachine, group, properties, parameters);
- }
-
public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
return server.getLeaderState().getLogAppenders();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 387319d..ef018e5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -57,7 +57,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
static File getSnapshotFile(MiniRaftCluster cluster, int i) {
final RaftServerImpl leader = cluster.getLeader();
- final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader.getProxy());
+ final SimpleStateMachine4Testing sm = SimpleStateMachine4Testing.get(leader);
return sm.getStateMachineStorage().getSnapshotFile(
leader.getState().getCurrentTerm(), i);
}
@@ -67,7 +67,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
Assert.assertEquals(SNAPSHOT_TRIGGER_THRESHOLD * 2,
leader.getState().getLog().getLastCommittedIndex());
- final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader.getProxy()).getContent();
+ final LogEntryProto[] entries = SimpleStateMachine4Testing.get(leader).getContent();
for (int i = 1; i < SNAPSHOT_TRIGGER_THRESHOLD * 2 - 1; i++) {
Assert.assertEquals(i+1, entries[i].getIndex());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 91643db..5d66ca3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -24,9 +24,9 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
-import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.LogInputStream;
import org.apache.ratis.server.storage.LogOutputStream;
@@ -61,7 +61,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
= "raft.test.simple.state.machine.take.snapshot";
private static final boolean RAFT_TEST_SIMPLE_STATE_MACHINE_TAKE_SNAPSHOT_DEFAULT = false;
- public static SimpleStateMachine4Testing get(RaftServer s) {
+ public static SimpleStateMachine4Testing get(RaftServerImpl s) {
return (SimpleStateMachine4Testing)s.getStateMachine();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c3407a22/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 611435a..a045ecd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -25,7 +25,6 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
@@ -95,7 +94,7 @@ public class TestStateMachine extends BaseTest {
}
static class SMTransactionContext extends SimpleStateMachine4Testing {
- public static SMTransactionContext get(RaftServer s) {
+ public static SMTransactionContext get(RaftServerImpl s) {
return (SMTransactionContext)s.getStateMachine();
}
@@ -167,7 +166,7 @@ public class TestStateMachine extends BaseTest {
Thread.sleep(cluster.getMaxTimeout() + 100);
for (RaftServerProxy raftServer : cluster.getServers()) {
- final SMTransactionContext sm = SMTransactionContext.get(raftServer);
+ final SMTransactionContext sm = SMTransactionContext.get(raftServer.getImpl());
sm.rethrowIfException();
assertEquals(numTrx, sm.numApplied.get());
}
@@ -175,7 +174,7 @@ public class TestStateMachine extends BaseTest {
// check leader
RaftServerImpl raftServer = cluster.getLeader();
// assert every transaction has obtained context in leader
- final SMTransactionContext sm = SMTransactionContext.get(raftServer.getProxy());
+ final SMTransactionContext sm = SMTransactionContext.get(raftServer);
List<Long> ll = sm.applied.stream().collect(Collectors.toList());
Collections.sort(ll);
assertEquals(ll.toString(), ll.size(), numTrx);