You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/09/03 09:01:18 UTC
incubator-ratis git commit: RATIS-300. Support multiple
RaftServerImpl in RaftServerProxy. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master c6c9ddf4d -> acd507e6e
RATIS-300. Support multiple RaftServerImpl in RaftServerProxy. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/acd507e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/acd507e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/acd507e6
Branch: refs/heads/master
Commit: acd507e6e2a5520682aa6ba37c5042a377f01cd0
Parents: c6c9ddf
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Sep 3 14:30:58 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Sep 3 14:30:58 2018 +0530
----------------------------------------------------------------------
.../ratis/protocol/AlreadyClosedException.java | 27 ++
.../java/org/apache/ratis/util/IOUtils.java | 18 +-
.../java/org/apache/ratis/util/JavaUtils.java | 17 +-
.../java/org/apache/ratis/util/LifeCycle.java | 3 +-
.../org/apache/ratis/util/Preconditions.java | 8 +
.../org/apache/ratis/TestRestartRaftPeer.java | 4 +-
.../ratis/examples/ParameterizedBaseTest.java | 2 +-
.../org/apache/ratis/grpc/RaftGRpcService.java | 14 +-
.../ratis/grpc/server/GRpcLogAppender.java | 2 +-
.../ratis/grpc/TestRaftServerWithGrpc.java | 4 +-
.../hadooprpc/server/HadoopRpcService.java | 2 +-
.../ratis/netty/server/NettyRpcService.java | 2 +-
.../org/apache/ratis/server/RaftServer.java | 5 +-
.../org/apache/ratis/server/RaftServerRpc.java | 3 +-
.../ratis/server/impl/RaftServerImpl.java | 20 +-
.../ratis/server/impl/RaftServerProxy.java | 301 +++++++++++++------
.../server/impl/RaftServerRpcWithProxy.java | 11 +-
.../org/apache/ratis/server/impl/RoleInfo.java | 5 +
.../ratis/server/impl/ServerImplUtils.java | 23 +-
.../apache/ratis/server/storage/RaftLog.java | 2 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 79 ++---
.../java/org/apache/ratis/RaftAsyncTests.java | 18 +-
.../java/org/apache/ratis/RaftBasicTests.java | 4 +-
.../java/org/apache/ratis/RaftTestUtil.java | 13 +-
.../TestRaftServerLeaderElectionTimeout.java | 3 +-
.../ratis/TestRaftServerSlownessDetection.java | 2 +-
.../ratis/server/impl/LeaderElectionTests.java | 5 +-
.../impl/RaftReconfigurationBaseTest.java | 3 +-
.../ratis/server/impl/RaftServerTestUtil.java | 10 +
.../server/impl/ReinitializationBaseTest.java | 11 +-
.../server/simulation/SimulatedServerRpc.java | 10 +-
.../statemachine/RaftSnapshotBaseTest.java | 3 +-
.../ratis/statemachine/TestStateMachine.java | 28 +-
33 files changed, 426 insertions(+), 236 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
new file mode 100644
index 0000000..85888a0
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/AlreadyClosedException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * The corresponding object is already closed.
+ */
+public class AlreadyClosedException extends RaftException {
+ public AlreadyClosedException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
index 915f4a2..c560bc6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/IOUtils.java
@@ -20,6 +20,8 @@
package org.apache.ratis.util;
+import org.slf4j.Logger;
+
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
@@ -30,10 +32,10 @@ import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
-import org.slf4j.Logger;
-
/**
* IO related utility methods.
*/
@@ -56,6 +58,18 @@ public interface IOUtils {
return cause != null? asIOException(cause): new IOException(e);
}
+ static <T> T getFromFuture(CompletableFuture<T> future, Object name) throws IOException {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw toInterruptedIOException(name + " interrupted.", e);
+ } catch (ExecutionException e) {
+ throw toIOException(e);
+ } catch (CompletionException e) {
+ throw asIOException(JavaUtils.unwrapCompletionException(e));
+ }
+ }
+
static boolean shouldReconnect(Exception e) {
return ReflectionUtils.isInstance(e,
SocketException.class, SocketTimeoutException.class, ClosedChannelException.class, EOFException.class);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 15cb4f6..dd8eb39 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -39,6 +39,7 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
+import java.util.function.Function;
import java.util.function.Supplier;
/**
@@ -67,12 +68,20 @@ public interface JavaUtils {
* wrap the checked exception by {@link RuntimeException}.
*/
static <T> T callAsUnchecked(Callable<T> callable) {
+ return callAsUnchecked(callable::call, RuntimeException::new);
+ }
+
+ static <OUTPUT, THROWABLE extends Throwable> OUTPUT callAsUnchecked(
+ CheckedSupplier<OUTPUT, THROWABLE> checkedSupplier,
+ Function<THROWABLE, ? extends RuntimeException> converter) {
try {
- return callable.call();
- } catch (RuntimeException e) {
+ return checkedSupplier.get();
+ } catch(RuntimeException | Error e) {
throw e;
- } catch (Exception e) {
- throw new RuntimeException(e);
+ } catch(Throwable t) {
+ @SuppressWarnings("unchecked")
+ final THROWABLE casted = (THROWABLE)t;
+ throw converter.apply(casted);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
index 93af1ef..f8f3648 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
@@ -158,12 +158,13 @@ public class LifeCycle {
}
/** Assert if the current state equals to one of the expected states. */
- public <T extends Throwable> void assertCurrentState(
+ public <T extends Throwable> State assertCurrentState(
BiFunction<String, State, T> newThrowable, State... expected) throws T {
final State c = getCurrentState();
if (!c.isOneOf(expected)) {
throw newThrowable.apply(name, c);
}
+ return c;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index f1d55b0..7af2201 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -72,4 +72,12 @@ public interface Preconditions {
throw new IllegalStateException(String.valueOf(message.get()));
}
}
+
+ static void assertNull(Object object, String name) {
+ if (object != null) {
+ throw new IllegalStateException(
+ name + " is expected to be null but "
+ + name + " = " + object + " != null, class = " + object.getClass());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
index 4b7ed40..ccbbda0 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/TestRestartRaftPeer.java
@@ -93,14 +93,14 @@ public class TestRestartRaftPeer extends BaseTest {
long lastAppliedIndex = 0;
for (int i = 0; i < 10 && !catchup; i++) {
Thread.sleep(500);
- lastAppliedIndex = cluster.getServer(followerId).getImpl().getState().getLastAppliedIndex();
+ lastAppliedIndex = cluster.getRaftServerImpl(followerId).getState().getLastAppliedIndex();
catchup = lastAppliedIndex >= 20;
}
Assert.assertTrue("lastAppliedIndex=" + lastAppliedIndex, catchup);
// make sure the restarted peer's log segments is correct
cluster.restartServer(followerId, false);
- Assert.assertTrue(cluster.getServer(followerId).getImpl().getState().getLog()
+ Assert.assertTrue(cluster.getRaftServerImpl(followerId).getState().getLog()
.getLastEntryTermIndex().getIndex() >= 20);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
index 3ee5e05..057c73a 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/ParameterizedBaseTest.java
@@ -51,7 +51,7 @@ public abstract class ParameterizedBaseTest extends BaseTest {
private static final AtomicReference<MiniRaftCluster> currentCluster = new AtomicReference<>();
/** Set {@link #currentCluster} to the given cluster and start it if {@link #currentCluster} is changed. */
- public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException {
+ public static void setAndStart(MiniRaftCluster cluster) throws InterruptedException, IOException {
final MiniRaftCluster previous = currentCluster.getAndSet(cluster);
if (previous != cluster) {
if (previous != null) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index fefec48..b3e514c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -111,11 +111,17 @@ public class RaftGRpcService extends RaftServerRpcWithProxy<RaftServerProtocolCl
}
@Override
- public void closeImpl() {
- if (server != null) {
- server.shutdown();
- }
+ public void closeImpl() throws IOException {
+ final String name = getId() + ": shutdown server with port " + server.getPort();
+ LOG.info("{} now", name);
+ final Server s = server.shutdownNow();
super.closeImpl();
+ try {
+ s.awaitTermination();
+ } catch(InterruptedException e) {
+ throw IOUtils.toInterruptedIOException(name + " failed", e);
+ }
+ LOG.info("{} successfully", name);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 595e061..f060c24 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -303,7 +303,7 @@ public class GRpcLogAppender extends LogAppender {
AppendEntriesRequestProto request = pendingRequests.remove(reply.getServerReply().getCallId());
if (request == null) {
// If reply comes after timeout, the reply is ignored.
- LOG.warn("Ignoring reply: " + reply);
+ LOG.warn("{}: Ignoring {}", server.getId(), reply);
return;
}
Preconditions.assertTrue(request.hasPreviousLog());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index e5e95ee..7173e1f 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -51,8 +51,8 @@ public class TestRaftServerWithGrpc extends BaseTest {
// the raft server proxy created earlier. Raft server proxy should close
// the rpc server on failure.
testFailureCase("start a new server with the same address",
- () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null),
- IOException.class, IOException.class, OverlappingFileLockException.class);
+ () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), stateMachine, properties, null).start(),
+ IOException.class, OverlappingFileLockException.class);
// Try to start a raft server rpc at the leader address.
cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index 5e571d4..5d07752 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -149,7 +149,7 @@ public class HadoopRpcService extends RaftServerRpcWithProxy<Proxy<RaftServerPro
}
@Override
- public void closeImpl() {
+ public void closeImpl() throws IOException {
ipcServer.stop();
super.closeImpl();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index ebe3184..45c0b77 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -134,7 +134,7 @@ public final class NettyRpcService extends RaftServerRpcWithProxy<NettyRpcProxy,
}
@Override
- public void closeImpl() {
+ public void closeImpl() throws IOException {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
final ChannelFuture f = getChannel().close();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index acd54cb..e071d4b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LifeCycle;
import java.io.Closeable;
import java.io.IOException;
@@ -50,7 +51,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
ServerFactory getFactory();
/** Start this server. */
- void start();
+ void start() throws IOException;
+
+ LifeCycle.State getLifeCycleState();
/** @return a {@link Builder}. */
static Builder newBuilder() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index 33db36d..a85e606 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import java.io.Closeable;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
@@ -51,7 +52,7 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl
}
/** Start the RPC service. */
- void start();
+ void start() throws IOException;
/** @return the address where this RPC server is listening to. */
InetSocketAddress getInetSocketAddress();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index fd0ee4a..e33177a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -166,8 +166,11 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
this.role.transitionRole(newRole);
}
- void start() {
- lifeCycle.transition(STARTING);
+ boolean start() {
+ if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
+ return false;
+ }
+ LOG.info("{}: start {}", getId(), groupId);
state.start();
RaftConfiguration conf = getRaftConf();
if (conf != null && conf.contains(getId())) {
@@ -179,6 +182,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
registerMBean(getId(), getGroupId(), jmxAdapter, jmxAdapter);
+ return true;
}
static boolean registerMBean(
@@ -235,6 +239,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
void shutdown() {
lifeCycle.checkStateAndClose(() -> {
+ LOG.info("{}: shutdown {}", getId(), groupId);
try {
jmxAdapter.unregister();
} catch (Exception ignored) {
@@ -490,8 +495,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
peers.toArray(new RaftPeer[peers.size()]));
}
- private void assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
- lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n
+ private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected) throws ServerNotReadyException {
+ return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException("Server " + n
+ " is not " + Arrays.toString(expected) + ": current state is " + c),
expected);
}
@@ -866,7 +871,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
+ ", commits" + ProtoUtils.toString(commitInfos)
+ ", entries: " + ServerProtoUtils.toString(entries));
- assertLifeCycleState(STARTING, RUNNING);
+ final LifeCycle.State currentState = assertLifeCycleState(STARTING, RUNNING);
+ if (currentState == STARTING) {
+ if (role.getCurrentRole() == null) {
+ throw new ServerNotReadyException("The role of Server " + getId() + " is not yet initialized.");
+ }
+ }
assertGroup(leaderId, leaderGroupId);
try {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 9db7735..27ec67f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -26,35 +26,132 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.CheckedFunction;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Collections;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
public class RaftServerProxy implements RaftServer {
public static final Logger LOG = LoggerFactory.getLogger(RaftServerProxy.class);
+ /**
+ * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
+ *
+ * The map is synchronized for mutations and the bulk {@link #getAll()} method
+ * but the (non-bulk) {@link #get(RaftGroupId)} and {@link #containsGroup(RaftGroupId)} methods are not.
+ * The thread safety and atomicity guarantees for the non-bulk methods are provided by {@link ConcurrentMap}.
+ */
+ class ImplMap implements Closeable {
+ private final ConcurrentMap<RaftGroupId, CompletableFuture<RaftServerImpl>> map = new ConcurrentHashMap<>();
+ private boolean isClosed = false;
+
+ synchronized CompletableFuture<RaftServerImpl> addNew(RaftGroup group) {
+ if (isClosed) {
+ return JavaUtils.completeExceptionally(new AlreadyClosedException(
+ getId() + ": Failed to add " + group + " since the server is already closed"));
+ }
+ if (containsGroup(group.getGroupId())) {
+ return JavaUtils.completeExceptionally(new AlreadyExistsException(
+ getId() + ": Failed to add " + group + " since the group already exists in the map."));
+ }
+ final RaftGroupId groupId = group.getGroupId();
+ final CompletableFuture<RaftServerImpl> newImpl = newRaftServerImpl(group);
+ final CompletableFuture<RaftServerImpl> previous = map.put(groupId, newImpl);
+ Preconditions.assertNull(previous, "previous");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: addNew {} returns {}", getId(), group, toString(groupId, newImpl));
+ }
+ return newImpl;
+ }
+
+ synchronized CompletableFuture<RaftServerImpl> remove(RaftGroupId groupId) {
+ final CompletableFuture<RaftServerImpl> future = map.remove(groupId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("{}: remove {}", getId(), toString(groupId, future));
+ }
+ return future;
+ }
+
+ @Override
+ public synchronized void close() {
+ if (isClosed) {
+ LOG.info("{} is already closed.", getId());
+ return;
+ }
+ isClosed = true;
+ map.values().parallelStream().map(CompletableFuture::join).forEach(RaftServerImpl::shutdown);
+ }
+
+ synchronized List<CompletableFuture<RaftServerImpl>> getAll() {
+ return new ArrayList<>(map.values());
+ }
+
+ CompletableFuture<RaftServerImpl> get(RaftGroupId groupId) {
+ final CompletableFuture<RaftServerImpl> i = map.get(groupId);
+ if (i == null) {
+ return JavaUtils.completeExceptionally(new GroupMismatchException(
+ getId() + ": " + groupId + " not found."));
+ }
+ return i;
+ }
+
+ boolean containsGroup(RaftGroupId groupId) {
+ return map.containsKey(groupId);
+ }
+
+ @Override
+ public synchronized String toString() {
+ if (map.isEmpty()) {
+ return "<EMPTY>";
+ } else if (map.size() == 1) {
+ return toString(map.entrySet().iterator().next());
+ }
+ final StringBuilder b = new StringBuilder("[");
+ map.entrySet().forEach(e -> b.append("\n ").append(toString(e)));
+ return b.append("] size=").append(map.size()).toString();
+ }
+
+ String toString(Map.Entry<RaftGroupId, CompletableFuture<RaftServerImpl>> e) {
+ return toString(e.getKey(), e.getValue());
+ }
+
+ String toString(RaftGroupId groupId, CompletableFuture<RaftServerImpl> f) {
+ return "" + (f != null && f.isDone()? f.join(): groupId + ":" + f);
+ }
+ }
+
private final RaftPeerId id;
private final RaftProperties properties;
private final StateMachine.Registry stateMachineRegistry;
+ private final LifeCycle lifeCycle;
private final RaftServerRpc serverRpc;
private final ServerFactory factory;
- private volatile CompletableFuture<RaftServerImpl> impl;
+ private final ImplMap impls = new ImplMap();
private final AtomicReference<ReinitializeRequest> reinitializeRequest = new AtomicReference<>();
RaftServerProxy(RaftPeerId id, StateMachine.Registry stateMachineRegistry,
- RaftGroup group, RaftProperties properties, Parameters parameters)
- throws IOException {
+ RaftProperties properties, Parameters parameters) {
this.properties = properties;
this.stateMachineRegistry = stateMachineRegistry;
@@ -63,23 +160,18 @@ public class RaftServerProxy implements RaftServer {
this.serverRpc = factory.newRaftServerRpc(this);
this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
+ this.lifeCycle = new LifeCycle(this.id);
+ }
- try {
- this.impl = CompletableFuture.completedFuture(initImpl(group));
- } catch (IOException ioe) {
+ private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
+ return CompletableFuture.supplyAsync(() -> {
try {
- serverRpc.close();
- } catch (IOException closeIoe) {
- LOG.warn(this.id + ": Failed to close server rpc.", closeIoe);
- ioe.addSuppressed(closeIoe);
- } finally {
- throw ioe;
+ serverRpc.addPeers(group.getPeers());
+ return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+ } catch(IOException e) {
+ throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
}
- }
- }
-
- private RaftServerImpl initImpl(RaftGroup group) throws IOException {
- return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
+ });
}
private static String getIdStringFrom(RaftServerRpc rpc) {
@@ -98,8 +190,9 @@ public class RaftServerProxy implements RaftServer {
return id;
}
+ @Override
public Iterable<RaftGroupId> getGroupIds() throws IOException {
- return Collections.singleton(getImpl().getGroupId());
+ return getImpls().stream().map(RaftServerImpl::getGroupId).collect(Collectors.toList());
}
@Override
@@ -121,54 +214,85 @@ public class RaftServerProxy implements RaftServer {
return serverRpc;
}
- public RaftServerImpl getImpl() throws IOException {
- final CompletableFuture<RaftServerImpl> i = impl;
- if (i == null) {
- throw new ServerNotReadyException(getId() + " is not initialized.");
- }
- try {
- return i.get();
- } catch (InterruptedException e) {
- throw IOUtils.toInterruptedIOException(getId() + ": getImpl interrupted.", e);
- } catch (ExecutionException e) {
- throw IOUtils.asIOException(e);
+ public boolean containsGroup(RaftGroupId groupId) {
+ return impls.containsGroup(groupId);
+ }
+
+ CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+ return impls.addNew(group);
+ }
+
+ private CompletableFuture<RaftServerImpl> getImplFuture(RaftGroupId groupId) {
+ return impls.get(groupId);
+ }
+
+ private RaftServerImpl getImpl(RaftRpcRequestProto proto) throws IOException {
+ return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId()));
+ }
+
+ RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
+ Objects.requireNonNull(groupId, "groupId == null");
+ return IOUtils.getFromFuture(getImplFuture(groupId), getId());
+ }
+
+ List<RaftServerImpl> getImpls() throws IOException {
+ final List<RaftServerImpl> list = new ArrayList<>();
+ for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
+ list.add(IOUtils.getFromFuture(f, getId()));
}
+ return list;
}
@Override
- public void start() {
- LOG.info("{}: start", getId());
- JavaUtils.getAndConsume(impl, RaftServerImpl::start);
- getServerRpc().start();
+ public LifeCycle.State getLifeCycleState() {
+ return lifeCycle.getCurrentState();
+ }
+
+ @Override
+ public void start() throws IOException {
+ getImpls().parallelStream().forEach(RaftServerImpl::start);
+
+ lifeCycle.startAndTransition(() -> {
+ LOG.info("{}: start RPC server", getId());
+ getServerRpc().start();
+ }, IOException.class);
}
@Override
public void close() {
- LOG.info("{}: close", getId());
- JavaUtils.getAndConsume(impl, RaftServerImpl::shutdown);
- try {
- getServerRpc().close();
- } catch (IOException ignored) {
- LOG.warn("Failed to close RPC server for " + getId(), ignored);
- }
+ lifeCycle.checkStateAndClose(() -> {
+ LOG.info("{}: close", getId());
+ impls.close();
+
+ try {
+ getServerRpc().close();
+ } catch(IOException ignored) {
+ LOG.warn(getId() + ": Failed to close " + getRpcType() + " server", ignored);
+ }
+ });
+ }
+
+ private <REPLY> CompletableFuture<REPLY> submitRequest(RaftGroupId groupId,
+ CheckedFunction<RaftServerImpl, CompletableFuture<REPLY>, IOException> submitFunction) {
+ return getImplFuture(groupId).thenCompose(
+ impl -> JavaUtils.callAsUnchecked(() -> submitFunction.apply(impl), CompletionException::new));
}
@Override
- public CompletableFuture<RaftClientReply> submitClientRequestAsync(
- RaftClientRequest request) throws IOException {
- return getImpl().submitClientRequestAsync(request);
+ public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
+ return submitRequest(request.getRaftGroupId(), impl -> impl.submitClientRequestAsync(request));
}
@Override
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
- return getImpl().submitClientRequest(request);
+ return getImpl(request.getRaftGroupId()).submitClientRequest(request);
}
@Override
public RaftClientReply setConfiguration(SetConfigurationRequest request)
throws IOException {
- return getImpl().setConfiguration(request);
+ return getImpl(request.getRaftGroupId()).setConfiguration(request);
}
@Override
@@ -180,37 +304,29 @@ public class RaftServerProxy implements RaftServer {
@Override
public CompletableFuture<RaftClientReply> reinitializeAsync(
ReinitializeRequest request) throws IOException {
- LOG.info("{}: reinitializeAsync {}", getId(), request);
- getImpl().assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ LOG.info("{}: reinitialize* {}", getId(), request);
if (!reinitializeRequest.compareAndSet(null, request)) {
throw new IOException("Another reinitialize is already in progress.");
}
-
- return CompletableFuture.supplyAsync(() -> {
- try {
- final CompletableFuture<RaftServerImpl> oldImpl = impl;
- impl = new CompletableFuture<>();
- JavaUtils.getAndConsume(oldImpl, RaftServerImpl::shutdown);
-
- final RaftServerImpl newImpl;
- try {
- newImpl = initImpl(request.getGroup());
- } catch (IOException ioe) {
- final RaftException re = new RaftException(
- "Failed to reinitialize, request=" + request, ioe);
- impl.completeExceptionally(new IOException(
- "Server " + getId() + " is not initialized.", re));
- return new RaftClientReply(request, re, null);
- }
-
- getServerRpc().addPeers(request.getGroup().getPeers());
- newImpl.start();
- impl.complete(newImpl);
- return new RaftClientReply(request, newImpl.getCommitInfos());
- } finally {
- reinitializeRequest.set(null);
- }
- });
+ final RaftGroupId oldGroupId = request.getRaftGroupId();
+ return getImplFuture(oldGroupId)
+ .thenAcceptAsync(RaftServerImpl::shutdown)
+ .thenAccept(_1 -> impls.remove(oldGroupId))
+ .thenCompose(_1 -> impls.addNew(request.getGroup()))
+ .thenApply(newImpl -> {
+ LOG.debug("{}: newImpl = {}", getId(), newImpl);
+ final boolean started = newImpl.start();
+ Preconditions.assertTrue(started, () -> getId()+ ": failed to start a new impl: " + newImpl);
+ return new RaftClientReply(request, newImpl.getCommitInfos());
+ })
+ .whenComplete((_1, throwable) -> {
+ if (throwable != null) {
+ impls.remove(request.getGroup().getGroupId());
+ LOG.warn(getId() + ": Failed reinitialize* " + request, throwable);
+ }
+
+ reinitializeRequest.set(null);
+ });
}
@Override
@@ -223,48 +339,41 @@ public class RaftServerProxy implements RaftServer {
@Override
public CompletableFuture<ServerInformationReply> getInfoAsync(
ServerInformationRequest request) {
- return impl.thenApply(server -> server.getServerInformation(request));
+ return getImplFuture(request.getRaftGroupId()).thenApplyAsync(
+ server -> server.getServerInformation(request));
}
/**
* Handle a raft configuration change request from client.
*/
@Override
- public CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException {
- return getImpl().setConfigurationAsync(request);
+ public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
+ return submitRequest(request.getRaftGroupId(), impl -> impl.setConfigurationAsync(request));
}
@Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
- throws IOException {
- return getImpl().requestVote(r);
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) throws IOException {
+ return getImpl(request.getServerRequest()).requestVote(request);
}
@Override
- public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
- AppendEntriesRequestProto r) throws IOException {
- return getImpl().appendEntriesAsync(r);
+ public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
+ final RaftGroupId groupId = ProtoUtils.toRaftGroupId(request.getServerRequest().getRaftGroupId());
+ return submitRequest(groupId, impl -> impl.appendEntriesAsync(request));
}
@Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
- throws IOException {
- return getImpl().appendEntries(r);
+ public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) throws IOException {
+ return getImpl(request.getServerRequest()).appendEntries(request);
}
@Override
- public InstallSnapshotReplyProto installSnapshot(
- InstallSnapshotRequestProto request) throws IOException {
- return getImpl().installSnapshot(request);
+ public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
+ return getImpl(request.getServerRequest()).installSnapshot(request);
}
@Override
public String toString() {
- try {
- return getImpl().toString();
- } catch (IOException ignored) {
- return getClass().getSimpleName() + ":" + getId();
- }
+ return getId() + String.format(":%9s ", lifeCycle.getCurrentState()) + impls;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
index aa8e2f7..68cf6fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
@@ -25,6 +25,7 @@ import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.PeerProxyMap;
import java.io.Closeable;
+import java.io.IOException;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -64,18 +65,18 @@ public abstract class RaftServerRpcWithProxy<PROXY extends Closeable, PROXIES ex
}
@Override
- public final void start() {
- getLifeCycle().startAndTransition(() -> startImpl());
+ public final void start() throws IOException {
+ getLifeCycle().startAndTransition(this::startImpl, IOException.class);
}
- public abstract void startImpl();
+ public abstract void startImpl() throws IOException;
@Override
- public final void close() {
+ public final void close() throws IOException{
getLifeCycle().checkStateAndClose(() -> closeImpl());
}
- public void closeImpl() {
+ public void closeImpl() throws IOException {
getProxies().close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index e7a6e80..20ec951 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -59,4 +59,9 @@ public class RoleInfo {
public boolean isLeader() {
return role == RaftPeerRole.LEADER;
}
+
+ @Override
+ public String toString() {
+ return "" + role;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index d9e0ee9..7b80dcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -34,27 +34,26 @@ public class ServerImplUtils {
public static RaftServerProxy newRaftServer(
RaftPeerId id, RaftGroup group, StateMachine stateMachine,
RaftProperties properties, Parameters parameters) throws IOException {
- return newRaftServer(id, group, gid -> stateMachine, properties, parameters);
+ RaftServerProxy.LOG.debug("newRaftServer: {}, {}", id, group);
+ final RaftServerProxy proxy = newRaftServer(id, gid -> stateMachine, properties, parameters);
+ if (group != null) {
+ proxy.addGroup(group);
+ }
+ return proxy;
}
- public static RaftServerProxy newRaftServer(
- RaftPeerId id, RaftGroup group, StateMachine.Registry stateMachineRegistry,
- RaftProperties properties, Parameters parameters) throws IOException {
+ private static RaftServerProxy newRaftServer(
+ RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftProperties properties, Parameters parameters)
+ throws IOException {
final RaftServerProxy proxy;
try {
// attempt multiple times to avoid temporary bind exception
proxy = JavaUtils.attempt(
- () -> new RaftServerProxy(id, stateMachineRegistry, group, properties, parameters),
+ () -> new RaftServerProxy(id, stateMachineRegistry, properties, parameters),
5, 500L, "new RaftServerProxy", RaftServerProxy.LOG);
} catch (InterruptedException e) {
throw IOUtils.toInterruptedIOException(
- "Interrupted when creating RaftServer " + id + ", " + group, e);
- } catch (IOException e) {
- throw new IOException("Failed to create RaftServer " + id + ", " + group, e);
- }
- // add peers into rpc service
- if (!group.getPeers().isEmpty()) {
- proxy.getServerRpc().addPeers(group.getPeers());
+ "Interrupted when creating RaftServer " + id, e);
}
return proxy;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 0d2ec4c..296dce6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -77,7 +77,7 @@ public abstract class RaftLog implements Closeable {
public void checkLogState() {
Preconditions.assertTrue(isOpen,
- "The RaftLog has not been opened or has been closed");
+ () -> getSelfId() + ": The RaftLog has not been opened or has been closed");
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 3806bb8..8104938 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -26,6 +26,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.storage.MemoryRaftLog;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
@@ -147,6 +148,7 @@ public abstract class MiniRaftCluster {
protected MiniRaftCluster(String[] ids, RaftProperties properties, Parameters parameters) {
this.group = initRaftGroup(Arrays.asList(ids));
+ LOG.info("new MiniRaftCluster {}", group);
this.properties = new RaftProperties(properties);
this.parameters = parameters;
@@ -160,6 +162,7 @@ public abstract class MiniRaftCluster {
}
public MiniRaftCluster initServers() {
+ LOG.info("servers = " + servers);
if (servers.isEmpty()) {
putNewServers(CollectionUtils.as(group.getPeers(), RaftPeer::getId), true);
}
@@ -183,7 +186,7 @@ public abstract class MiniRaftCluster {
.collect(Collectors.toList());
}
- public void start() {
+ public void start() throws IOException {
LOG.info(".............................................................. ");
LOG.info("... ");
LOG.info("... Starting " + getClass().getSimpleName());
@@ -191,7 +194,7 @@ public abstract class MiniRaftCluster {
LOG.info(".............................................................. ");
initServers();
- servers.values().forEach(RaftServer::start);
+ startServers(servers.values());
}
/**
@@ -201,7 +204,7 @@ public abstract class MiniRaftCluster {
killServer(newId);
servers.remove(newId);
- startServer(putNewServer(newId, format), true);
+ putNewServer(newId, format).start();
}
public void restart(boolean format) throws IOException {
@@ -217,8 +220,8 @@ public abstract class MiniRaftCluster {
return RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
}
- private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group,
- boolean format) {
+ private RaftServerProxy newRaftServer(RaftPeerId id, RaftGroup group, boolean format) {
+ LOG.info("newRaftServer: {}, {}, format? {}", id, group, format);
try {
final File dir = getStorageDir(id);
if (format) {
@@ -280,14 +283,14 @@ public abstract class MiniRaftCluster {
return addNewPeers(generateIds(number, servers.size()), startNewPeer);
}
- public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) {
+ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer) throws IOException {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
final Collection<RaftServerProxy> newServers = putNewServers(
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
- newServers.forEach(s -> startServer(s, true));
+ startServers(newServers);
if (!startNewPeer) {
// start and then close, in order to bind the port
newServers.forEach(p -> p.close());
@@ -301,16 +304,12 @@ public abstract class MiniRaftCluster {
return new PeerChanges(p, np, new RaftPeer[0]);
}
- protected void startServer(RaftServer server, boolean startService) {
- if (startService) {
- server.start();
+ static void startServers(Iterable<? extends RaftServer> servers) throws IOException {
+ for(RaftServer s : servers) {
+ s.start();
}
}
- public void startServer(RaftPeerId id) {
- startServer(getServer(id), true);
- }
-
/**
* prepare the peer list when removing some peers from the conf
*/
@@ -356,18 +355,7 @@ public abstract class MiniRaftCluster {
} else {
b.append("ALL groups");
}
- getServers().stream().filter(
- s -> {
- if (groupId == null) {
- return true;
- }
- try {
- return groupId.equals(s.getImpl().getGroupId());
- } catch (IOException e) {
- return false;
- }
- })
- .forEach(s -> b.append("\n ").append(s));
+ getRaftServerProxyStream(groupId).forEach(s -> b.append("\n ").append(s));
return b.toString();
}
@@ -407,11 +395,7 @@ public abstract class MiniRaftCluster {
}
public RaftServerImpl getLeader(RaftGroupId groupId) {
- Stream<RaftServerImpl> stream = getServerAliveStream();
- if (groupId != null) {
- stream = stream.filter(s -> groupId.equals(s.getGroupId()));
- }
- return getLeader(stream);
+ return getLeader(getServerAliveStream(groupId));
}
static RaftServerImpl getLeader(Stream<RaftServerImpl> serverAliveStream) {
@@ -439,7 +423,7 @@ public abstract class MiniRaftCluster {
return leaders.get(0);
}
- boolean isLeader(String leaderId) throws InterruptedException {
+ boolean isLeader(String leaderId) {
final RaftServerImpl leader = getLeader();
return leader != null && leader.getId().toString().equals(leaderId);
}
@@ -454,24 +438,41 @@ public abstract class MiniRaftCluster {
return servers.values();
}
- public Iterable<RaftServerImpl> iterateServerImpls() {
- return CollectionUtils.as(getServers(), RaftTestUtil::getImplAsUnchecked);
+ private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
+ return getServers().stream()
+ .filter(s -> groupId == null || s.containsGroup(groupId));
}
- public static Stream<RaftServerImpl> getServerStream(Collection<RaftServerProxy> servers) {
- return servers.stream().map(RaftTestUtil::getImplAsUnchecked);
+ public Iterable<RaftServerImpl> iterateServerImpls() {
+ return CollectionUtils.as(getServers(), this::getRaftServerImpl);
}
- public Stream<RaftServerImpl> getServerStream() {
- return getServerStream(getServers());
+
+ private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
+ final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
+ return groupId != null? stream.map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId))
+ : stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream());
}
+
public Stream<RaftServerImpl> getServerAliveStream() {
- return getServerStream(getServers()).filter(RaftServerImpl::isAlive);
+ return getServerAliveStream(getGroupId());
+ }
+
+ private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
+ return getServerStream(groupId).filter(RaftServerImpl::isAlive);
}
public RaftServerProxy getServer(RaftPeerId id) {
return servers.get(id);
}
+ public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
+ return getRaftServerImpl(servers.get(id));
+ }
+
+ private RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
+ return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
+ }
+
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index b297a27..3d80f5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
@@ -90,6 +91,13 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
}
}
+ static void setBlockTransaction(boolean block, MiniRaftCluster cluster) throws InterruptedException {
+ for (RaftServerProxy server : cluster.getServers()) {
+ final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
+ ((SimpleStateMachine4Testing)impl.getStateMachine()).setBlockTransaction(block);
+ }
+ }
+
@Test
public void testAsyncRequestSemaphore() throws Exception {
LOG.info("Running testAsyncRequestSemaphore");
@@ -103,9 +111,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMessages);
final RaftClient client = cluster.createClient();
//Set blockTransaction flag so that transaction blocks
- for (RaftServerProxy server : cluster.getServers()) {
- ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(true);
- }
+ setBlockTransaction(true, cluster);
//Send numMessages which are blocked and do not release the client semaphore permits
AtomicInteger blockedRequestsCount = new AtomicInteger();
@@ -133,9 +139,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
//Unset the blockTransaction flag so that semaphore permits can be released
- for (RaftServerProxy server : cluster.getServers()) {
- ((SimpleStateMachine4Testing) server.getImpl().getStateMachine()).setBlockTransaction(false);
- }
+ setBlockTransaction(false, cluster);
for(int i=0; i<=numMessages; i++){
futures[i].join();
}
@@ -148,7 +152,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
try {
cluster.start();
waitForLeader(cluster);
- RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 1000, cluster, LOG);
+ RaftBasicTests.runTestBasicAppendEntries(true, replication, killLeader, 100, cluster, LOG);
} finally {
cluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8c744ab..8c0def9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -176,7 +176,7 @@ public abstract class RaftBasicTests extends BaseTest {
LOG.info(cluster.printAllLogs());
for(RaftServerProxy server : cluster.getServers()) {
- final RaftServerImpl impl = server.getImpl();
+ final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
if (impl.isAlive() || replication == ReplicationLevel.ALL) {
JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
5, 1000, impl.getId() + " assertLogEntries", LOG);
@@ -420,7 +420,7 @@ public abstract class RaftBasicTests extends BaseTest {
if (c.exceptionInClientThread.get() != null) {
throw new AssertionError(c.exceptionInClientThread.get());
}
- RaftTestUtil.assertLogEntries(cluster.getServers(), c.messages);
+ RaftTestUtil.assertLogEntries(cluster, c.messages);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 53a051a..8667f50 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -52,13 +52,8 @@ import java.util.function.Predicate;
public interface RaftTestUtil {
-
Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
- static RaftServerImpl getImplAsUnchecked(RaftServerProxy proxy) {
- return JavaUtils.callAsUnchecked(proxy::getImpl);
- }
-
static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
throws InterruptedException {
return waitForLeader(cluster, false);
@@ -156,11 +151,9 @@ public interface RaftTestUtil {
}
}
- static void assertLogEntries(Collection<RaftServerProxy> servers,
- SimpleMessage... expectedMessages) {
- final int size = servers.size();
- final long count = MiniRaftCluster.getServerStream(servers)
- .filter(RaftServerImpl::isAlive)
+ static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage... expectedMessages) {
+ final int size = cluster.getServers().size();
+ final long count = cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
.filter(log -> logEntriesContains(log, expectedMessages))
.count();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
index 67156a1..04a2a8d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
@@ -60,7 +61,7 @@ public class TestRaftServerLeaderElectionTimeout extends BaseTest {
}
@Before
- public void setup() {
+ public void setup() throws IOException {
Assert.assertNull(cluster.getLeader());
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 811e7de..51af8af 100644
--- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -64,7 +64,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
}
@Before
- public void setup() {
+ public void setup() throws IOException {
Assert.assertNull(cluster.getLeader());
cluster.start();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 4886bcc..54fbf5f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -101,7 +101,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
final RaftServerImpl leader = waitForLeader(cluster);
- final TimeDuration sleepTime = TimeDuration.valueOf(5, TimeUnit.SECONDS);
+ final TimeDuration sleepTime = TimeDuration.valueOf(3, TimeUnit.SECONDS);
LOG.info("sleep " + sleepTime);
sleepTime.sleep();
@@ -109,8 +109,9 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
final RaftServerProxy lastServer = i.next();
lastServer.start();
final RaftPeerId lastServerLeaderId = JavaUtils.attempt(
- () -> getLeader(lastServer.getImpl().getState()),
+ () -> getLeader(lastServer.getImpls().iterator().next().getState()),
10, 1000, "getLeaderId", LOG);
+ LOG.info(cluster.printServers());
Assert.assertEquals(leader.getId(), lastServerLeaderId);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 8a340d7..79017d4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -321,8 +321,7 @@ public abstract class RaftReconfigurationBaseTest extends BaseTest {
final RaftLog leaderLog = cluster.getLeader().getState().getLog();
for (RaftPeer newPeer : c1.newPeers) {
Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
- cluster.getServer(newPeer.getId())
- .getImpl().getState().getLog()
+ cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
.getEntries(0, Long.MAX_VALUE));
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index a72e6f5..a4ec715 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.util.JavaUtils;
import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.List;
import java.util.stream.Stream;
public class RaftServerTestUtil {
@@ -87,4 +89,12 @@ public class RaftServerTestUtil {
public static Logger getStateMachineUpdaterLog() {
return StateMachineUpdater.LOG;
}
+
+ public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy) {
+ return JavaUtils.callAsUnchecked(proxy::getImpls);
+ }
+
+ public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
+ return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
index 251a4d3..f6e417c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ReinitializationBaseTest.java
@@ -48,6 +48,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
static final Logger LOG = LoggerFactory.getLogger(ReinitializationBaseTest.class);
{
+ LogUtils.setLogLevel(RaftServerProxy.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
}
@@ -66,7 +67,7 @@ public abstract class ReinitializationBaseTest extends BaseTest {
LOG.info("Start testReinitialize" + cluster.printServers());
// Start server with an empty conf
- final RaftGroupId groupId = RaftGroupId.randomId();
+ final RaftGroupId groupId = cluster.getGroupId();
final RaftGroup group = new RaftGroup(groupId);
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(3, 0))
@@ -75,10 +76,10 @@ public abstract class ReinitializationBaseTest extends BaseTest {
LOG.info("putNewServer: " + cluster.printServers());
cluster.start();
- LOG.info("start: " + cluster.printServers());
// Make sure that there are no leaders.
TimeUnit.SECONDS.sleep(1);
+ LOG.info("start: " + cluster.printServers());
Assert.assertNull(cluster.getLeader());
// Reinitialize servers
@@ -128,18 +129,20 @@ public abstract class ReinitializationBaseTest extends BaseTest {
LOG.info("\n\nrunTestReinitializeMultiGroups with " + type + ": " + cluster.printServers());
// Start server with an empty conf
- final RaftGroup emptyGroup = new RaftGroup(RaftGroupId.randomId());
+ final RaftGroup emptyGroup = new RaftGroup(cluster.getGroupId());
final List<RaftPeerId> ids = Arrays.stream(MiniRaftCluster.generateIds(idIndex[idIndex.length - 1], 0))
.map(RaftPeerId::valueOf).collect(Collectors.toList());
+ LOG.info("ids: " + ids);
ids.forEach(id -> cluster.putNewServer(id, emptyGroup, true));
LOG.info("putNewServer: " + cluster.printServers());
+ TimeUnit.SECONDS.sleep(1);
cluster.start();
- LOG.info("start: " + cluster.printServers());
// Make sure that there are no leaders.
TimeUnit.SECONDS.sleep(1);
+ LOG.info("start: " + cluster.printServers());
Assert.assertNull(cluster.getLeader());
// Reinitialize servers to three groups
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
index 1be6c3a..c4a9a5e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedServerRpc.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.server.simulation;
-import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.protocol.*;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
@@ -25,6 +24,8 @@ import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,7 +126,7 @@ class SimulatedServerRpc implements RaftServerRpc {
= new RequestHandler.HandlerInterface<RaftServerRequest, RaftServerReply>() {
@Override
public boolean isAlive() {
- return RaftTestUtil.getImplAsUnchecked(server).isAlive();
+ return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
}
@Override
@@ -147,7 +148,7 @@ class SimulatedServerRpc implements RaftServerRpc {
= new RequestHandler.HandlerInterface<RaftClientRequest, RaftClientReply>() {
@Override
public boolean isAlive() {
- return RaftTestUtil.getImplAsUnchecked(server).isAlive();
+ return !server.getLifeCycleState().isOneOf(LifeCycle.State.CLOSING, LifeCycle.State.CLOSED);
}
@Override
@@ -168,7 +169,8 @@ class SimulatedServerRpc implements RaftServerRpc {
future.whenCompleteAsync((reply, exception) -> {
try {
- final IOException e = IOUtils.asIOException(exception);
+ final IOException e = exception == null? null
+ : IOUtils.asIOException(JavaUtils.unwrapCompletionException(exception));
clientHandler.getRpc().sendReply(request, reply, e);
} catch (IOException e) {
LOG.warn("Failed to send reply {} for request {} due to exception {}",
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index ef018e5..cf0f611 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.util.List;
public abstract class RaftSnapshotBaseTest extends BaseTest {
@@ -82,7 +83,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
public abstract MiniRaftCluster.Factory<?> getFactory();
@Before
- public void setup() {
+ public void setup() throws IOException {
final RaftProperties prop = new RaftProperties();
prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/acd507e6/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index a045ecd..07039b1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -27,7 +27,6 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
@@ -68,31 +67,19 @@ public class TestStateMachine extends BaseTest {
@Before
public void setup() throws IOException {
- }
+ properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SMTransactionContext.class, StateMachine.class);
- private void startCluster() {
- cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(
- NUM_SERVERS, properties);
- Assert.assertNull(getCluster().getLeader());
- getCluster().start();
+ cluster = MiniRaftClusterWithSimulatedRpc.FACTORY.newCluster(NUM_SERVERS, properties);
+ cluster.start();
}
@After
public void tearDown() {
- final MiniRaftCluster cluster = getCluster();
if (cluster != null) {
cluster.shutdown();
}
}
- public MiniRaftClusterWithSimulatedRpc getCluster() {
- return cluster;
- }
-
- public RaftProperties getProperties() {
- return properties;
- }
-
static class SMTransactionContext extends SimpleStateMachine4Testing {
public static SMTransactionContext get(RaftServerImpl s) {
return (SMTransactionContext)s.getStateMachine();
@@ -149,11 +136,6 @@ public class TestStateMachine extends BaseTest {
@Test
public void testTransactionContextIsPassedBack() throws Throwable {
// tests that the TrxContext set by the StateMachine in Leader is passed back to the SM
- properties.setClass(
- MiniRaftCluster.STATEMACHINE_CLASS_KEY,
- SMTransactionContext.class, StateMachine.class);
- startCluster();
-
int numTrx = 100;
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numTrx);
try(final RaftClient client = cluster.createClient()) {
@@ -165,8 +147,8 @@ public class TestStateMachine extends BaseTest {
// TODO: there eshould be a better way to ensure all data is replicated and applied
Thread.sleep(cluster.getMaxTimeout() + 100);
- for (RaftServerProxy raftServer : cluster.getServers()) {
- final SMTransactionContext sm = SMTransactionContext.get(raftServer.getImpl());
+ for (RaftServerImpl raftServer : cluster.iterateServerImpls()) {
+ final SMTransactionContext sm = SMTransactionContext.get(raftServer);
sm.rethrowIfException();
assertEquals(numTrx, sm.numApplied.get());
}