You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/11 07:56:54 UTC
[incubator-ratis] branch master updated: RATIS-921. Fix resource
leak by closing thousands of gRPC clients after use (#99)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 965bcec RATIS-921. Fix resource leak by closing thousands of gRPC clients after use (#99)
965bcec is described below
commit 965bcec41ccc36056f029becb743d8f24b420d76
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Jun 11 15:56:44 2020 +0800
RATIS-921. Fix resource leak by closing thousands of gRPC clients after use (#99)
---
.../main/java/org/apache/ratis/grpc/GrpcUtil.java | 35 +++
.../grpc/client/GrpcClientProtocolClient.java | 8 +-
.../grpc/server/GrpcServerProtocolClient.java | 9 +-
.../ratis/logservice/impl/LogStreamImpl.java | 1 +
.../apache/ratis/logservice/server/LogServer.java | 1 +
.../ratis/logservice/server/LogStateMachine.java | 7 +
.../ratis/logservice/server/MetaStateMachine.java | 38 +--
.../ratis/logservice/LogServiceReadWriteBase.java | 52 ++--
.../ratis/logservice/server/TestMetaServer.java | 300 ++++++++++++---------
.../java/org/apache/ratis/LogAppenderTests.java | 46 ++--
.../test/java/org/apache/ratis/RaftAsyncTests.java | 81 +++---
.../java/org/apache/ratis/RetryCacheTests.java | 96 +++----
.../ratis/server/impl/GroupManagementBaseTest.java | 28 +-
.../server/impl/RaftReconfigurationBaseTest.java | 201 +++++++-------
.../impl/RaftStateMachineExceptionTests.java | 108 ++++----
.../server/impl/StateMachineShutdownTests.java | 77 +++---
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 51 ++--
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 57 ++--
.../ratis/retry/TestExceptionDependentRetry.java | 11 +-
.../apache/ratis/server/ServerRestartTests.java | 18 +-
.../ratis/statemachine/TestStateMachine.java | 7 +-
21 files changed, 667 insertions(+), 565 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 0856a42..28b7cc7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.ServerNotReadyException;
import org.apache.ratis.protocol.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.Metadata;
import org.apache.ratis.thirdparty.io.grpc.Status;
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
@@ -30,13 +31,17 @@ import org.apache.ratis.util.LogUtils;
import org.apache.ratis.util.ReflectionUtils;
import org.apache.ratis.util.function.CheckedSupplier;
import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
public interface GrpcUtil {
+ static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
+
Metadata.Key<String> EXCEPTION_TYPE_KEY =
Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
Metadata.Key<byte[]> EXCEPTION_OBJECT_KEY =
@@ -198,4 +203,34 @@ public interface GrpcUtil {
return trailers;
}
}
+
+ /**
+ * Tries to gracefully shut down the managed channel. Falls back to forceful shutdown if
+ * graceful shutdown times out.
+ */
+ static void shutdownManagedChannel(ManagedChannel managedChannel) {
+ // Close the gRPC managed-channel if not shut down already.
+ if (!managedChannel.isShutdown()) {
+ try {
+ managedChannel.shutdown();
+ if (!managedChannel.awaitTermination(3, TimeUnit.SECONDS)) {
+ LOG.warn("Timed out gracefully shutting down connection: {}. ", managedChannel);
+ }
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination", e);
+ }
+ }
+
+ // Forceful shut down if still not terminated.
+ if (!managedChannel.isTerminated()) {
+ try {
+ managedChannel.shutdownNow();
+ if (!managedChannel.awaitTermination(2, TimeUnit.SECONDS)) {
+ LOG.warn("Timed out forcefully shutting down connection: {}. ", managedChannel);
+ }
+ } catch (Exception e) {
+ LOG.error("Unexpected exception while waiting for channel termination", e);
+ }
+ }
+ }
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index be9c7b2..1d6860a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -66,7 +66,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -143,12 +142,7 @@ public class GrpcClientProtocolClient implements Closeable {
public void close() {
Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
- channel.shutdown();
- try {
- channel.awaitTermination(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Unexpected exception while waiting for channel termination", e);
- }
+ GrpcUtil.shutdownManagedChannel(channel);
scheduler.close();
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index cb46f99..399089a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -18,6 +18,7 @@
package org.apache.ratis.grpc.server;
import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -35,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
/**
* This is a RaftClient implementation that supports streaming data to the raft
@@ -89,12 +89,7 @@ public class GrpcServerProtocolClient implements Closeable {
@Override
public void close() {
- channel.shutdown();
- try {
- channel.awaitTermination(5, TimeUnit.SECONDS);
- } catch (Exception e) {
- LOG.error("Unexpected exception while waiting for channel termination, peerId={}", raftPeerId, e);
- }
+ GrpcUtil.shutdownManagedChannel(channel);
}
public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 6f73fa4..7980de7 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -192,6 +192,7 @@ public class LogStreamImpl implements LogStream {
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
+ raftClient.close();
state = State.CLOSED;
}
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 22ac6cc..b68a763 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -176,6 +176,7 @@ public class LogServer extends BaseServer {
public void close() throws IOException {
+ metaClient.close();
raftServer.close();
daemon.interrupt();
}
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 3a0d779..02e9234 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -485,6 +485,13 @@ public class LogStateMachine extends BaseStateMachine {
public void close() {
reset();
logServiceMetrics.unregister();
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception ignored) {
+ LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+ }
+ }
}
@Override
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 1313c32..a0ba191 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -261,21 +261,15 @@ public class MetaStateMachine extends BaseStateMachine {
} else {
Collection<RaftPeer> raftPeers = raftGroup.getPeers();
raftPeers.stream().forEach(peer -> {
- RaftClient client = RaftClient.newBuilder().setProperties(properties)
- .setClientId(ClientId.randomId())
- .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
- try {
+ try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+ .setClientId(ClientId.randomId()).setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()){
client.groupRemove(raftGroup.getGroupId(), true, peer.getId());
} catch (IOException e) {
e.printStackTrace();
}
});
- RaftClient client = RaftClient.newBuilder()
- .setRaftGroup(currentGroup)
- .setClientId(ClientId.randomId())
- .setProperties(properties)
- .build();
- try {
+ try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
+ .setClientId(ClientId.randomId()).setProperties(properties).build()){
client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
.setUnregisterRequest(
LogServiceUnregisterLogRequestProto.newBuilder()
@@ -323,9 +317,8 @@ public class MetaStateMachine extends BaseStateMachine {
int provisionedPeers = 0;
Exception originalException = null;
for (RaftPeer peer : peers) {
- RaftClient client = RaftClient.newBuilder().setProperties(properties)
- .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
- try {
+ try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+ .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
client.groupAdd(raftGroup, peer.getId());
} catch (IOException e) {
LOG.error("Failed to add Raft group ({}) for new Log({})",
@@ -343,9 +336,8 @@ public class MetaStateMachine extends BaseStateMachine {
if (tornDownPeers >= provisionedPeers) {
break;
}
- RaftClient client = RaftClient.newBuilder().setProperties(properties)
- .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
- try {
+ try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+ .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
client.groupRemove(raftGroup.getGroupId(), true, peer.getId());
} catch (IOException e) {
LOG.error("Failed to clean up Raft group ({}) for peer ({}), "
@@ -358,12 +350,8 @@ public class MetaStateMachine extends BaseStateMachine {
MetaServiceProtoUtil.toCreateLogExceptionReplyProto(originalException)
.build().toByteString()));
}
- RaftClient client = RaftClient.newBuilder()
- .setRaftGroup(currentGroup)
- .setClientId(ClientId.randomId())
- .setProperties(properties)
- .build();
- try {
+ try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
+ .setClientId(ClientId.randomId()).setProperties(properties).build()){
client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
.setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
.setLogname(LogServiceProtoUtil.toLogNameProto(name))
@@ -458,9 +446,8 @@ public class MetaStateMachine extends BaseStateMachine {
while(itr.hasNext()) {
LogName logName = itr.next();
RaftGroup group = map.get(logName);
- RaftClient client = RaftClient.newBuilder().
- setRaftGroup(group).setProperties(properties).build();
- try {
+ try (RaftClient client = RaftClient.newBuilder()
+ .setRaftGroup(group).setProperties(properties).build()) {
LOG.warn(String.format("Peer %s in the group %s went down." +
" Hence closing the log %s serve by the group.",
raftPeer.toString(), group.toString(), logName.toString()));
@@ -475,7 +462,6 @@ public class MetaStateMachine extends BaseStateMachine {
throw new IOException(message.getException().getErrorMsg());
}
itr.remove();
- client.close();
} catch (IOException e) {
LOG.warn(String.format("Failed to close log %s on peer %s failure.",
logName, raftPeer.toString()), e);
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index c579990..84aba03 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -125,12 +125,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
@Test
public void testLogServiceReadWrite() throws Exception {
- RaftClient raftClient =
- RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
- .build();
- LogName logName = LogName.of("log1");
- // TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+ try (RaftClient raftClient =
+ RaftClient.newBuilder().setProperties(getProperties())
+ .setRaftGroup(cluster.getGroup()).build()) {
+ LogName logName = LogName.of("log1");
+ // TODO need API to circumvent metadata service for testing
+ LogStream logStream = new MetricLogStream(logName, raftClient);
assertEquals("log1", logStream.getName().getName());
assertEquals(State.OPEN, logStream.getState());
assertEquals(0, logStream.getSize());
@@ -181,13 +181,13 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
@Test
public void testReadAllRecords() throws Exception {
- final RaftClient raftClient =
- RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
- .build();
- final LogName logName = LogName.of("log1");
- final int numRecords = 25;
- // TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+ try (RaftClient raftClient =
+ RaftClient.newBuilder().setProperties(getProperties())
+ .setRaftGroup(cluster.getGroup()).build()) {
+ final LogName logName = LogName.of("log1");
+ final int numRecords = 25;
+ // TODO need API to circumvent metadata service for testing
+ LogStream logStream = new MetricLogStream(logName, raftClient);
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
// Write records 0 through 99 (inclusive)
@@ -222,13 +222,13 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
@Test
public void testSeeking() throws Exception {
- final RaftClient raftClient =
- RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
- .build();
- final LogName logName = LogName.of("log1");
- final int numRecords = 100;
- // TODO need API to circumvent metadata service for testing
- try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+ try (final RaftClient raftClient =
+ RaftClient.newBuilder().setProperties(getProperties())
+ .setRaftGroup(cluster.getGroup()).build()) {
+ final LogName logName = LogName.of("log1");
+ final int numRecords = 100;
+ // TODO need API to circumvent metadata service for testing
+ LogStream logStream = new MetricLogStream(logName, raftClient);
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
// Write records 0 through 99 (inclusive)
@@ -259,12 +259,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
@Test
public void testSeekFromWrite() throws Exception {
- final RaftClient raftClient =
- RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
- .build();
- final LogName logName = LogName.of("log1");
- final int numRecords = 10;
- try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+ try (final RaftClient raftClient =
+ RaftClient.newBuilder().setProperties(getProperties())
+ .setRaftGroup(cluster.getGroup()).build()) {
+ final LogName logName = LogName.of("log1");
+ final int numRecords = 10;
+ LogStream logStream = new MetricLogStream(logName, raftClient);
final List<Long> recordIds;
try (LogWriter writer = logStream.createWriter()) {
LOG.info("Writing {} records", numRecords);
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 03c6e80..c911fad 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -39,6 +39,8 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
@@ -55,6 +57,8 @@ import javax.management.ObjectName;
import static org.junit.Assert.*;
public class TestMetaServer {
+ private static final Logger LOG = LoggerFactory.getLogger(TestMetaServer.class);
+
static {
JVMMetrics.initJvmMetrics(TimeDuration.valueOf(10, TimeUnit.SECONDS));
}
@@ -129,10 +133,12 @@ public class TestMetaServer {
@Test
public void testCreateAndGetLog() throws Exception {
// This should be LogServiceStream ?
- LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
- assertNotNull(logStream1);
- LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
- assertNotNull(logStream2);
+ try (LogStream logStream1 = client.createLog(LogName.of("testCreateLog"))) {
+ assertNotNull(logStream1);
+ }
+ try (LogStream logStream2 = client.getLog(LogName.of("testCreateLog"))) {
+ assertNotNull(logStream2);
+ }
}
/**
@@ -144,8 +150,9 @@ public class TestMetaServer {
boolean peerClosed = false;
try {
for(int i = 0; i < 5; i++) {
- LogStream logStream1 = client.createLog(LogName.of("testCloseLogOnNodeFailure"+i));
- assertNotNull(logStream1);
+ try (LogStream logStream1 = client.createLog(LogName.of("testCloseLogOnNodeFailure"+i))) {
+ assertNotNull(logStream1);
+ }
}
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).getMetaStateMachine()).checkPeersAreSame());
workers.get(0).close();
@@ -153,9 +160,10 @@ public class TestMetaServer {
Thread.sleep(90000);
assertTrue(((MetaStateMachine)cluster.getMasters().get(0).getMetaStateMachine()).checkPeersAreSame());
for(int i = 0; i < 5; i++) {
- LogStream logStream2 = client.getLog(LogName.of("testCloseLogOnNodeFailure"+i));
- assertNotNull(logStream2);
- assertEquals(State.CLOSED, logStream2.getState());
+ try (LogStream logStream2 = client.getLog(LogName.of("testCloseLogOnNodeFailure"+i))) {
+ assertNotNull(logStream2);
+ assertEquals(State.CLOSED, logStream2.getState());
+ }
}
} finally {
if(peerClosed) {
@@ -166,108 +174,111 @@ public class TestMetaServer {
}
@Test
- public void testReadWritetoLog() throws IOException, InterruptedException {
- LogStream stream = client.createLog(LogName.of("testReadWrite"));
- LogWriter writer = stream.createWriter();
- ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes());
- List<LogInfo> listLogs = client.listLogs();
- assert(listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
- List<LogServer> workers = cluster.getWorkers();
- for(LogServer worker : workers) {
- RaftServerImpl server = ((RaftServerProxy)worker.getServer())
- .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
- // TODO: perform all additional checks on state machine level
- }
- writer.write(testMessage);
- for(LogServer worker : workers) {
- RaftServerImpl server = ((RaftServerProxy)worker.getServer())
- .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
- }
+ public void testReadWritetoLog() throws Exception {
+ try (LogStream stream = client.createLog(LogName.of("testReadWrite"))) {
+ LogWriter writer = stream.createWriter();
+ ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes());
+ List<LogInfo> listLogs = client.listLogs();
+ assert (listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
+ List<LogServer> workers = cluster.getWorkers();
+ for (LogServer worker : workers) {
+ RaftServerImpl server = ((RaftServerProxy) worker.getServer())
+ .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+ // TODO: perform all additional checks on state machine level
+ }
+ writer.write(testMessage);
+ for (LogServer worker : workers) {
+ RaftServerImpl server = ((RaftServerProxy) worker.getServer())
+ .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+ }
// assert(stream.getSize() > 0); //TODO: Doesn't work
- LogReader reader = stream.createReader();
- ByteBuffer res = reader.readNext();
- assert(res.array().length > 0);
+ LogReader reader = stream.createReader();
+ ByteBuffer res = reader.readNext();
+ assert (res.array().length > 0);
+ }
}
@Test
- public void testLogArchival() throws IOException, InterruptedException {
+ public void testLogArchival() throws Exception {
LogName logName = LogName.of("testArchivalLog");
- LogStream logStream = client.createLog(logName);
- LogWriter writer = logStream.createWriter();
- List<LogInfo> listLogs = client.listLogs();
- assert (listLogs.stream()
- .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
- List<LogServer> workers = cluster.getWorkers();
- List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
- writer.write(records);
- client.closeLog(logName);
- assertEquals(logStream.getState(), State.CLOSED);
- client.archiveLog(logName);
- int retry = 0;
- while (logStream.getState() != State.ARCHIVED && retry <= 40) {
- Thread.sleep(1000);
- retry++;
+ try (LogStream logStream = client.createLog(logName)) {
+ LogWriter writer = logStream.createWriter();
+ List<LogInfo> listLogs = client.listLogs();
+ assert (listLogs.stream()
+ .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+ List<LogServer> workers = cluster.getWorkers();
+ List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+ writer.write(records);
+ client.closeLog(logName);
+ assertEquals(logStream.getState(), State.CLOSED);
+ client.archiveLog(logName);
+ int retry = 0;
+ while (logStream.getState() != State.ARCHIVED && retry <= 40) {
+ Thread.sleep(1000);
+ retry++;
+ }
+ assertEquals(logStream.getState(), State.ARCHIVED);
+ LogReader reader = logStream.createReader();
+ List<ByteBuffer> data = reader.readBulk(records.size());
+ assertEquals(records.size(), data.size());
+ reader.seek(1);
+ data = reader.readBulk(records.size());
+ assertEquals(records.size() - 1, data.size());
+
+ //Test ArchiveLogStream
+ LogServiceConfiguration config = LogServiceConfiguration.create();
+ LogStream archiveLogStream = client.getArchivedLog(logName);
+ reader = archiveLogStream.createReader();
+ data = reader.readBulk(records.size());
+ assertEquals(records.size(), data.size());
}
- assertEquals(logStream.getState(), State.ARCHIVED);
- LogReader reader = logStream.createReader();
- List<ByteBuffer> data = reader.readBulk(records.size());
- assertEquals(records.size(), data.size());
- reader.seek(1);
- data = reader.readBulk(records.size());
- assertEquals(records.size() - 1, data.size());
-
- //Test ArchiveLogStream
- LogServiceConfiguration config = LogServiceConfiguration.create();
- LogStream archiveLogStream = client.getArchivedLog(logName);
- reader = archiveLogStream.createReader();
- data = reader.readBulk(records.size());
- assertEquals(records.size(), data.size());
}
@Test
- public void testLogExport() throws IOException, InterruptedException {
+ public void testLogExport() throws Exception {
LogName logName = LogName.of("testLogExport");
- LogStream logStream = client.createLog(logName);
- LogWriter writer = logStream.createWriter();
- List<LogInfo> listLogs = client.listLogs();
- assert (listLogs.stream()
- .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
- List<LogServer> workers = cluster.getWorkers();
- List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
- writer.write(records);
- String location1 = "target/tmp/export_1/";
- String location2 = "target/tmp/export_2/";
- deleteLocalDirectory(new File(location1));
- deleteLocalDirectory(new File(location2));
- int startPosition1 = 3;
- int startPosition2 = 5;
- client.exportLog(logName, location1, startPosition1);
- client.exportLog(logName, location2, startPosition2);
- List<ArchivalInfo> infos=client.getExportStatus(logName);
- int count=0;
- while (infos.size() > 1 && (
- infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
- || infos.get(1).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED)
- && count < 10) {
- infos = client.getExportStatus(logName);
- ;
- Thread.sleep(1000);
- count++;
+ try (LogStream logStream = client.createLog(logName)) {
+ LogWriter writer = logStream.createWriter();
+ List<LogInfo> listLogs = client.listLogs();
+ assert (listLogs.stream()
+ .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+ List<LogServer> workers = cluster.getWorkers();
+ List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+ writer.write(records);
+ String location1 = "target/tmp/export_1/";
+ String location2 = "target/tmp/export_2/";
+ deleteLocalDirectory(new File(location1));
+ deleteLocalDirectory(new File(location2));
+ int startPosition1 = 3;
+ int startPosition2 = 5;
+ client.exportLog(logName, location1, startPosition1);
+ client.exportLog(logName, location2, startPosition2);
+ List<ArchivalInfo> infos = client.getExportStatus(logName);
+ int count = 0;
+ while (infos.size() > 1 && (
+ infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
+ || infos.get(1).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED)
+ && count < 10) {
+ infos = client.getExportStatus(logName);
+ ;
+ Thread.sleep(1000);
+ count++;
- }
+ }
- //Test ExportLogStream
- LogStream exportLogStream = client.getExportLog(logName, location1);
- LogReader reader = exportLogStream.createReader();
- List<ByteBuffer> data = reader.readBulk(records.size());
- assertEquals(records.size() - startPosition1, data.size());
- reader.close();
- exportLogStream = client.getExportLog(logName, location2);
- reader = exportLogStream.createReader();
- data = reader.readBulk(records.size());
- assertEquals(records.size() - startPosition2, data.size());
- reader.close();
- writer.close();
+ //Test ExportLogStream
+ LogStream exportLogStream = client.getExportLog(logName, location1);
+ LogReader reader = exportLogStream.createReader();
+ List<ByteBuffer> data = reader.readBulk(records.size());
+ assertEquals(records.size() - startPosition1, data.size());
+ reader.close();
+ exportLogStream = client.getExportLog(logName, location2);
+ reader = exportLogStream.createReader();
+ data = reader.readBulk(records.size());
+ assertEquals(records.size() - startPosition2, data.size());
+ reader.close();
+ writer.close();
+ }
}
boolean deleteLocalDirectory(File dir) {
@@ -289,31 +300,40 @@ public class TestMetaServer {
@Test
public void testDeleteLog() throws Exception {
// This should be LogServiceStream ?
- LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"));
- assertNotNull(logStream1);
- client.deleteLog(LogName.of("testDeleteLog"));
- testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
- (long) deleteCount.get());
- try {
- logStream1 = client.getLog(LogName.of("testDeleteLog"));
- fail("Failed to throw LogNotFoundException");
- } catch(Exception e) {
- assert(e instanceof LogNotFoundException);
+ try (LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"))) {
+ assertNotNull(logStream1);
+ client.deleteLog(LogName.of("testDeleteLog"));
+ testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
+ (long) deleteCount.get());
+ LogStream logStream2 = null;
+ try {
+ logStream2 = client.getLog(LogName.of("testDeleteLog"));
+ fail("Failed to throw LogNotFoundException");
+ } catch (Exception e) {
+ assert (e instanceof LogNotFoundException);
+ } finally {
+ if (logStream2 != null) {
+ logStream2.close();
+ }
+ }
}
-
-
}
/**
* Test for getting not existing log. Should throw an exception
* @throws IOException
*/
@Test
- public void testGetNotExistingLog() {
+ public void testGetNotExistingLog() throws Exception {
+ LogStream logStream = null;
try {
- LogStream log = client.getLog(LogName.of("no_such_log"));
+ logStream = client.getLog(LogName.of("no_such_log"));
fail("LogNotFoundException was not thrown");
} catch (IOException e) {
assert(e instanceof LogNotFoundException);
+ } finally {
+ if (logStream != null) {
+ logStream.close();
+ }
}
}
@@ -323,13 +343,30 @@ public class TestMetaServer {
*/
@Test
public void testAlreadyExistLog() throws Exception {
- LogStream logStream1 = client.createLog(LogName.of("test1"));
- assertNotNull(logStream1);
+ try (LogStream logStream1 = client.createLog(LogName.of("test1"))) {
+ assertNotNull(logStream1);
+ LogStream logStream2 = null;
+ try {
+ logStream2 = client.createLog(LogName.of("test1"));
+ fail("Didn't fail with LogAlreadyExistException");
+ } catch (IOException e) {
+ assert (e instanceof LogAlreadyExistException);
+ } finally {
+ if (logStream2 != null) {
+ logStream2.close();
+ }
+ }
+ }
+ }
+
+ private void createLogAndClose(String name) throws Exception {
+ LogStream logStream = null;
try {
- logStream1 = client.createLog(LogName.of("test1"));
- fail("Didn't fail with LogAlreadyExistException");
- } catch (IOException e) {
- assert(e instanceof LogAlreadyExistException);
+ logStream = client.createLog(LogName.of(name));
+ } finally {
+ if (logStream != null) {
+ logStream.close();
+ }
}
}
@@ -339,13 +376,13 @@ public class TestMetaServer {
*/
@Test
public void testListLogs() throws Exception {
- client.createLog(LogName.of("listLogTest1"));
- client.createLog(LogName.of("listLogTest2"));
- client.createLog(LogName.of("listLogTest3"));
- client.createLog(LogName.of("listLogTest4"));
- client.createLog(LogName.of("listLogTest5"));
- client.createLog(LogName.of("listLogTest6"));
- client.createLog(LogName.of("listLogTest7"));
+ createLogAndClose("listLogTest1");
+ createLogAndClose("listLogTest2");
+ createLogAndClose("listLogTest3");
+ createLogAndClose("listLogTest4");
+ createLogAndClose("listLogTest5");
+ createLogAndClose("listLogTest6");
+ createLogAndClose("listLogTest7");
// Test jmx
List<LogInfo> list = client.listLogs();
@@ -380,10 +417,19 @@ public class TestMetaServer {
@Test
public void testFinalClieanUp() throws Exception {
IntStream.range(0, 10).forEach(i -> {
+ LogStream logStream = null;
try {
- client.createLog(LogName.of("CleanTest" + i));
+ logStream = client.createLog(LogName.of("CleanTest" + i));
} catch (IOException e) {
throw new RuntimeException(e);
+ } finally {
+ if (logStream != null) {
+ try {
+ logStream.close();
+ } catch (Exception ignored) {
+ LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+ }
+ }
}
});
List<LogInfo> list = client.listLogs();
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 30a1452..2f87cd8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -43,6 +43,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
@@ -51,8 +52,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import com.codahale.metrics.Gauge;
@@ -172,23 +171,40 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
final int numMsgs = 10;
final int numClients = 5;
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+ List<RaftClient> clients = new ArrayList<>();
- // start several clients and write concurrently
- final CountDownLatch latch = new CountDownLatch(1);
- final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
- .map(i -> new Sender(cluster.createClient(leaderId), numMsgs, latch))
- .collect(Collectors.toList());
- senders.forEach(Thread::start);
+ try {
+ List<Sender> senders = new ArrayList<>();
- latch.countDown();
+ // start several clients and write concurrently
+ final CountDownLatch latch = new CountDownLatch(1);
- for (Sender s : senders) {
- s.join();
- final Exception e = s.exception.get();
- if (e != null) {
- throw e;
+ for (int i = 0; i < numClients; i ++) {
+ RaftClient client = cluster.createClient(leaderId);
+ clients.add(client);
+ senders.add(new Sender(client, numMsgs, latch));
+ }
+
+ senders.forEach(Thread::start);
+
+ latch.countDown();
+
+ for (Sender s : senders) {
+ s.join();
+ final Exception e = s.exception.get();
+ if (e != null) {
+ throw e;
+ }
+ Assert.assertTrue(s.succeed.get());
+ }
+ } finally {
+ for (int i = 0; i < clients.size(); i ++) {
+ try {
+ clients.get(i).close();
+ } catch (Exception ignored) {
+ LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+ }
}
- Assert.assertTrue(s.succeed.get());
}
final ServerState leaderState = cluster.getLeader().getState();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 6f44bfc..2a6b463 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -199,47 +199,48 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
int numMessages = RaftClientConfigKeys.Async.outstandingRequestsMax(getProperties());
CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
final SimpleMessage[] messages = SimpleMessage.create(numMessages);
- final RaftClient client = cluster.createClient();
- //Set blockTransaction flag so that transaction blocks
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
- .map(SimpleStateMachine4Testing::get)
- .forEach(SimpleStateMachine4Testing::blockStartTransaction);
-
- //Send numMessages which are blocked and do not release the client semaphore permits
- AtomicInteger blockedRequestsCount = new AtomicInteger();
- for (int i=0; i<numMessages; i++) {
- blockedRequestsCount.getAndIncrement();
- futures[i] = client.sendAsync(messages[i]);
- blockedRequestsCount.decrementAndGet();
- }
- Assert.assertEquals(0, blockedRequestsCount.get());
-
- futures[numMessages] = CompletableFuture.supplyAsync(() -> {
- blockedRequestsCount.incrementAndGet();
- client.sendAsync(new SimpleMessage("n1"));
- blockedRequestsCount.decrementAndGet();
- return null;
- });
-
- //Allow the last msg to be sent
- while (blockedRequestsCount.get() != 1) {
- Thread.sleep(1000);
- }
- Assert.assertEquals(1, blockedRequestsCount.get());
- //Since all semaphore permits are acquired the last message sent is in queue
- RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
-
- //Unset the blockTransaction flag so that semaphore permits can be released
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
- .map(SimpleStateMachine4Testing::get)
- .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
-
- for(int i=0; i<=numMessages; i++){
- futures[i].join();
+ try (final RaftClient client = cluster.createClient()) {
+ //Set blockTransaction flag so that transaction blocks
+ cluster.getServers().stream()
+ .map(cluster::getRaftServerImpl)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::blockStartTransaction);
+
+ //Send numMessages which are blocked and do not release the client semaphore permits
+ AtomicInteger blockedRequestsCount = new AtomicInteger();
+ for (int i = 0; i < numMessages; i++) {
+ blockedRequestsCount.getAndIncrement();
+ futures[i] = client.sendAsync(messages[i]);
+ blockedRequestsCount.decrementAndGet();
+ }
+ Assert.assertEquals(0, blockedRequestsCount.get());
+
+ futures[numMessages] = CompletableFuture.supplyAsync(() -> {
+ blockedRequestsCount.incrementAndGet();
+ client.sendAsync(new SimpleMessage("n1"));
+ blockedRequestsCount.decrementAndGet();
+ return null;
+ });
+
+ //Allow the last msg to be sent
+ while (blockedRequestsCount.get() != 1) {
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(1, blockedRequestsCount.get());
+ //Since all semaphore permits are acquired the last message sent is in queue
+ RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
+
+ //Unset the blockTransaction flag so that semaphore permits can be released
+ cluster.getServers().stream()
+ .map(cluster::getRaftServerImpl)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+
+ for (int i = 0; i <= numMessages; i++) {
+ futures[i].join();
+ }
+ Assert.assertEquals(0, blockedRequestsCount.get());
}
- Assert.assertEquals(0, blockedRequestsCount.get());
}
void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 37d4e99..e3eec8c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -65,20 +65,20 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId();
long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
- final RaftClient client = cluster.createClient(leaderId);
- final RaftClientRpc rpc = client.getClientRpc();
- final long callId = 999;
- RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, new SimpleMessage("message"));
- assertReply(rpc.sendRequest(r), client, callId);
-
- // retry with the same callId
- for (int i = 0; i < 5; i++) {
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, new SimpleMessage("message"));
assertReply(rpc.sendRequest(r), client, callId);
- }
- assertServer(cluster, client.getId(), callId, oldLastApplied);
- client.close();
+ // retry with the same callId
+ for (int i = 0; i < 5; i++) {
+ assertReply(rpc.sendRequest(r), client, callId);
+ }
+
+ assertServer(cluster, client.getId(), callId, oldLastApplied);
+ }
}
public static RaftClient assertReply(RaftClientReply reply, RaftClient client, long callId) {
@@ -126,43 +126,43 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId();
- final RaftClient client = cluster.createClient(leaderId);
- RaftClientRpc rpc = client.getClientRpc();
- final long callId = 999;
- RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, new SimpleMessage("message"));
- assertReply(rpc.sendRequest(r), client, callId);
- long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
-
- // trigger the reconfiguration, make sure the original leader is kicked out
- PeerChanges change = cluster.addNewPeers(2, true);
- RaftPeer[] allPeers = cluster.removePeers(2, true,
- asList(change.newPeers)).allPeersInNewConf;
- // trigger setConfiguration
- cluster.setConfiguration(allPeers);
-
- final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
- final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
- Assert.assertNotEquals(leaderId, id);
- return id;
- }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a leader different than " + leaderId, LOG);
- Assert.assertNotEquals(leaderId, newLeaderId);
- // same clientId and callId in the request
- r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
- callId, new SimpleMessage("message"));
- rpc.addServers(Arrays.asList(change.newPeers));
- for (int i = 0; i < 10; i++) {
- try {
- assertReply(rpc.sendRequest(r), client, callId);
- LOG.info("successfully sent out the retry request_" + i);
- } catch (Exception e) {
- LOG.info("hit exception while retrying the same request: " + r, e);
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, new SimpleMessage("message"));
+ assertReply(rpc.sendRequest(r), client, callId);
+ long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+ // trigger the reconfiguration, make sure the original leader is kicked out
+ PeerChanges change = cluster.addNewPeers(2, true);
+ RaftPeer[] allPeers = cluster.removePeers(2, true,
+ asList(change.newPeers)).allPeersInNewConf;
+ // trigger setConfiguration
+ cluster.setConfiguration(allPeers);
+
+ final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
+ final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
+ Assert.assertNotEquals(leaderId, id);
+ return id;
+ }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a leader different than " + leaderId, LOG);
+ Assert.assertNotEquals(leaderId, newLeaderId);
+ // same clientId and callId in the request
+ r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
+ callId, new SimpleMessage("message"));
+ rpc.addServers(Arrays.asList(change.newPeers));
+ for (int i = 0; i < 10; i++) {
+ try {
+ assertReply(rpc.sendRequest(r), client, callId);
+ LOG.info("successfully sent out the retry request_" + i);
+ } catch (Exception e) {
+ LOG.info("hit exception while retrying the same request: " + r, e);
+ }
+ Thread.sleep(100);
}
- Thread.sleep(100);
- }
- // check the new leader and make sure the retry did not get committed
- Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
- client.close();
+ // check the new leader and make sure the retry did not get committed
+ Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
+ }
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 0ad0906..acc26ab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -85,9 +85,10 @@ public abstract class GroupManagementBaseTest extends BaseTest {
// Add groups
final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), cluster.getPeers());
LOG.info("add new group: " + newGroup);
- final RaftClient client = cluster.createClient(newGroup);
- for(RaftPeer p : newGroup.getPeers()) {
- client.groupAdd(newGroup, p.getId());
+ try (final RaftClient client = cluster.createClient(newGroup)) {
+ for (RaftPeer p : newGroup.getPeers()) {
+ client.groupAdd(newGroup, p.getId());
+ }
}
Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
TimeUnit.SECONDS.sleep(1);
@@ -241,16 +242,17 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftPeer peer = cluster.getPeers().get(0);
final RaftPeerId peerId = peer.getId();
final RaftGroup group = RaftGroup.valueOf(cluster.getGroupId(), peer);
- final RaftClient client = cluster.createClient();
- Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
- try {
- client.groupAdd(group, peer.getId());
- } catch (IOException ex) {
- // HadoopRPC throws RemoteException, which makes it hard to check if
- // the exception is instance of AlreadyExistsException
- Assert.assertTrue(ex.toString().contains(AlreadyExistsException.class.getCanonicalName()));
+ try (final RaftClient client = cluster.createClient()) {
+ Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+ try {
+ client.groupAdd(group, peer.getId());
+ } catch (IOException ex) {
+ // HadoopRPC throws RemoteException, which makes it hard to check if
+ // the exception is instance of AlreadyExistsException
+ Assert.assertTrue(ex.toString().contains(AlreadyExistsException.class.getCanonicalName()));
+ }
+ Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+ cluster.shutdown();
}
- Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
- cluster.shutdown();
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index f7545bc..1f83cac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -142,63 +142,63 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
void runTestReconfTwice(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
- final RaftClient client = cluster.createClient(leaderId);
+ try (final RaftClient client = cluster.createClient(leaderId)) {
- // submit some msgs before reconf
- for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
-
- final AtomicBoolean reconf1 = new AtomicBoolean(false);
- final AtomicBoolean reconf2 = new AtomicBoolean(false);
- final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null);
- final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null);
- CountDownLatch latch = new CountDownLatch(1);
- Thread clientThread = new Thread(() -> {
- try {
- PeerChanges c1 = cluster.addNewPeers(2, true);
- LOG.info("Start changing the configuration: {}",
- asList(c1.allPeersInNewConf));
-
- RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
- reconf1.set(reply.isSuccess());
-
- PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
- finalPeers.set(c2.allPeersInNewConf);
- deadPeers.set(c2.removedPeers);
-
- LOG.info("Start changing the configuration again: {}",
- asList(c2.allPeersInNewConf));
- reply = client.setConfiguration(c2.allPeersInNewConf);
- reconf2.set(reply.isSuccess());
-
- latch.countDown();
- client.close();
- } catch(Exception ignored) {
- LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+ // submit some msgs before reconf
+ for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
+ RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
}
- });
- clientThread.start();
- latch.await();
- Assert.assertTrue(reconf1.get());
- Assert.assertTrue(reconf2.get());
- waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
- final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
-
- // check configuration manager's internal state
- // each reconf will generate two configurations: (old, new) and (new)
- cluster.getServerAliveStream().forEach(server -> {
- ConfigurationManager confManager =
- (ConfigurationManager) Whitebox.getInternalState(server.getState(),
- "configurationManager");
+ final AtomicBoolean reconf1 = new AtomicBoolean(false);
+ final AtomicBoolean reconf2 = new AtomicBoolean(false);
+ final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null);
+ final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null);
+ CountDownLatch latch = new CountDownLatch(1);
+ Thread clientThread = new Thread(() -> {
+ try {
+ PeerChanges c1 = cluster.addNewPeers(2, true);
+ LOG.info("Start changing the configuration: {}",
+ asList(c1.allPeersInNewConf));
+
+ RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+ reconf1.set(reply.isSuccess());
+
+ PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
+ finalPeers.set(c2.allPeersInNewConf);
+ deadPeers.set(c2.removedPeers);
+
+ LOG.info("Start changing the configuration again: {}",
+ asList(c2.allPeersInNewConf));
+ reply = client.setConfiguration(c2.allPeersInNewConf);
+ reconf2.set(reply.isSuccess());
+
+ latch.countDown();
+ } catch (Exception ignored) {
+ LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+ }
+ });
+ clientThread.start();
+
+ latch.await();
+ Assert.assertTrue(reconf1.get());
+ Assert.assertTrue(reconf2.get());
+ waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
+ final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
+
+ // check configuration manager's internal state
// each reconf will generate two configurations: (old, new) and (new)
- // each leader change generates one configuration.
- // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
- final int expectedConf = leader2.equals(leaderId)? 6: 7;
- Assert.assertEquals(server.getId() + ": " + confManager, expectedConf, confManager.numOfConf());
- });
+ cluster.getServerAliveStream().forEach(server -> {
+ ConfigurationManager confManager =
+ (ConfigurationManager) Whitebox.getInternalState(server.getState(),
+ "configurationManager");
+ // each reconf will generate two configurations: (old, new) and (new)
+ // each leader change generates one configuration.
+ // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
+ final int expectedConf = leader2.equals(leaderId) ? 6 : 7;
+ Assert.assertEquals(server.getId() + ": " + confManager, expectedConf, confManager.numOfConf());
+ });
+ }
}
@Test
@@ -258,39 +258,39 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
void runTestBootstrapReconf(CLUSTER cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
- final RaftClient client = cluster.createClient(leaderId);
-
- // submit some msgs before reconf
- for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
- RaftClientReply reply = client.send(new SimpleMessage("m" + i));
- Assert.assertTrue(reply.isSuccess());
- }
+ try (final RaftClient client = cluster.createClient(leaderId)) {
- PeerChanges c1 = cluster.addNewPeers(2, true);
- LOG.info("Start changing the configuration: {}",
- asList(c1.allPeersInNewConf));
- final AtomicReference<Boolean> success = new AtomicReference<>();
-
- Thread clientThread = new Thread(() -> {
- try {
- RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
- success.set(reply.isSuccess());
- client.close();
- } catch (IOException ioe) {
- LOG.error("FAILED", ioe);
+ // submit some msgs before reconf
+ for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
+ RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+ Assert.assertTrue(reply.isSuccess());
}
- });
- clientThread.start();
- Thread.sleep(5000);
- LOG.info(cluster.printServers());
- assertSuccess(success);
+ PeerChanges c1 = cluster.addNewPeers(2, true);
+ LOG.info("Start changing the configuration: {}",
+ asList(c1.allPeersInNewConf));
+ final AtomicReference<Boolean> success = new AtomicReference<>();
+
+ Thread clientThread = new Thread(() -> {
+ try {
+ RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+ success.set(reply.isSuccess());
+ } catch (IOException ioe) {
+ LOG.error("FAILED", ioe);
+ }
+ });
+ clientThread.start();
+
+ Thread.sleep(5000);
+ LOG.info(cluster.printServers());
+ assertSuccess(success);
- final RaftLog leaderLog = cluster.getLeader().getState().getLog();
- for (RaftPeer newPeer : c1.newPeers) {
- Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
- cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
- .getEntries(0, Long.MAX_VALUE));
+ final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+ for (RaftPeer newPeer : c1.newPeers) {
+ Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
+ cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
+ .getEntries(0, Long.MAX_VALUE));
+ }
}
}
@@ -577,26 +577,29 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
AtomicBoolean success = new AtomicBoolean(false);
final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
new Thread(() -> {
- final RaftClient client = cluster.createClient(leaderId);
- final RaftClientRpc sender = client.getClientRpc();
- final RaftClientRequest request = cluster.newRaftClientRequest(
- client.getId(), leaderId, new SimpleMessage("test"));
- while (!success.get()) {
- try {
- final RaftClientReply reply = sender.sendRequest(request);
- success.set(reply.isSuccess());
- if (reply.getException() != null && reply.getException() instanceof LeaderNotReadyException) {
- caughtNotReady.set(true);
- }
- } catch (IOException e) {
- LOG.info("Hit other IOException", e);
- }
- if (!success.get()) {
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ final RaftClientRpc sender = client.getClientRpc();
+ final RaftClientRequest request = cluster.newRaftClientRequest(
+ client.getId(), leaderId, new SimpleMessage("test"));
+ while (!success.get()) {
try {
- Thread.sleep(200);
- } catch (InterruptedException ignored) {
+ final RaftClientReply reply = sender.sendRequest(request);
+ success.set(reply.isSuccess());
+ if (reply.getException() != null && reply.getException() instanceof LeaderNotReadyException) {
+ caughtNotReady.set(true);
+ }
+ } catch (IOException e) {
+ LOG.info("Hit other IOException", e);
+ }
+ if (!success.get()) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ignored) {
+ }
}
}
+ } catch (Exception ignored) {
+ LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
}
}).start();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index e94b43d..329c8a3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -105,39 +105,39 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
cluster.getLeaderAndSendFirstMessage(true);
long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
- final RaftClient client = cluster.createClient(leaderId);
- final RaftClientRpc rpc = client.getClientRpc();
- final long callId = 999;
- final SimpleMessage message = new SimpleMessage("message");
- final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
- RaftClientReply reply = rpc.sendRequest(r);
- Assert.assertFalse(reply.isSuccess());
- Assert.assertNotNull(reply.getStateMachineException());
-
- // retry with the same callId
- for (int i = 0; i < 5; i++) {
- reply = rpc.sendRequest(r);
- Assert.assertEquals(client.getId(), reply.getClientId());
- Assert.assertEquals(callId, reply.getCallId());
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ final SimpleMessage message = new SimpleMessage("message");
+ final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
+ RaftClientReply reply = rpc.sendRequest(r);
Assert.assertFalse(reply.isSuccess());
Assert.assertNotNull(reply.getStateMachineException());
- }
- long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
- // make sure retry cache has the entry
- for (RaftServerImpl server : cluster.iterateServerImpls()) {
- LOG.info("check server " + server.getId());
- if (server.getState().getLastAppliedIndex() < leaderApplied) {
- Thread.sleep(1000);
+ // retry with the same callId
+ for (int i = 0; i < 5; i++) {
+ reply = rpc.sendRequest(r);
+ Assert.assertEquals(client.getId(), reply.getClientId());
+ Assert.assertEquals(callId, reply.getCallId());
+ Assert.assertFalse(reply.isSuccess());
+ Assert.assertNotNull(reply.getStateMachineException());
}
- Assert.assertNotNull(
- RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
- final RaftLog log = server.getState().getLog();
- RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
- }
- client.close();
- cluster.shutdown();
+ long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ // make sure retry cache has the entry
+ for (RaftServerImpl server : cluster.iterateServerImpls()) {
+ LOG.info("check server " + server.getId());
+ if (server.getState().getLastAppliedIndex() < leaderApplied) {
+ Thread.sleep(1000);
+ }
+ Assert.assertNotNull(
+ RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+ final RaftLog log = server.getState().getLog();
+ RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
+ }
+
+ cluster.shutdown();
+ }
}
@Test
@@ -150,31 +150,31 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
cluster.getLeaderAndSendFirstMessage(true);
// turn on the preAppend failure switch
failPreAppend = true;
- final RaftClient client = cluster.createClient(oldLeader.getId());
- final RaftClientRpc rpc = client.getClientRpc();
- final long callId = 999;
- final SimpleMessage message = new SimpleMessage("message");
- RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), oldLeader.getId(), callId, message);
- RaftClientReply reply = rpc.sendRequest(r);
- Objects.requireNonNull(reply.getStateMachineException());
-
- final RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(oldLeader, client.getId(), callId);
- Assert.assertNotNull(oldEntry);
- Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
-
- // At this point of time the old leader would have stepped down. wait for leader election to complete
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- // retry
- r = cluster.newRaftClientRequest(client.getId(), leader.getId(), callId, message);
- reply = rpc.sendRequest(r);
- Objects.requireNonNull(reply.getStateMachineException());
-
- RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
- leader, client.getId(), callId);
- Assert.assertNotNull(currentEntry);
- Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
- Assert.assertNotEquals(oldEntry, currentEntry);
- failPreAppend = false;
- client.close();
+ try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
+ final RaftClientRpc rpc = client.getClientRpc();
+ final long callId = 999;
+ final SimpleMessage message = new SimpleMessage("message");
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), oldLeader.getId(), callId, message);
+ RaftClientReply reply = rpc.sendRequest(r);
+ Objects.requireNonNull(reply.getStateMachineException());
+
+ final RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(oldLeader, client.getId(), callId);
+ Assert.assertNotNull(oldEntry);
+ Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
+
+ // At this point of time the old leader would have stepped down. wait for leader election to complete
+ final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ // retry
+ r = cluster.newRaftClientRequest(client.getId(), leader.getId(), callId, message);
+ reply = rpc.sendRequest(r);
+ Objects.requireNonNull(reply.getStateMachineException());
+
+ RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
+ leader, client.getId(), callId);
+ Assert.assertNotNull(currentEntry);
+ Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
+ Assert.assertNotEquals(oldEntry, currentEntry);
+ failPreAppend = false;
+ }
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 7858f5a..9c1fe09 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -89,44 +89,43 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
cluster.getLeaderAndSendFirstMessage(true);
- final RaftClient client = cluster.createClient(leaderId);
- client.send(new RaftTestUtil.SimpleMessage("message"));
- RaftClientReply reply = client.send(
- new RaftTestUtil.SimpleMessage("message2"));
-
- long logIndex = reply.getLogIndex();
- //Confirm that followers have committed
- RaftClientReply watchReply = client.sendWatch(
- logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
- watchReply.getCommitInfos().forEach(
- val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
-
- RaftServerImpl secondFollower = cluster.getFollowers().get(1);
- // Second follower is blocked in apply transaction
- Assert.assertTrue(
- secondFollower.getState().getLastAppliedIndex()
- < logIndex);
-
- // Now shutdown the follower in a separate thread
- Thread t = new Thread(() -> secondFollower.shutdown(true));
- t.start();
-
- // The second follower should still be blocked in apply transaction
- Assert.assertTrue(
- secondFollower.getState().getLastAppliedIndex()
- < logIndex);
-
- // Now unblock the second follower
- ((StateMachineWithConditionalWait)secondFollower.getStateMachine())
- .unBlockApplyTxn();
-
- // Now wait for the thread
- t.join(5000);
- Assert.assertEquals(
- secondFollower.getState().getLastAppliedIndex(),
- logIndex);
-
- client.close();
- cluster.shutdown();
+ try (final RaftClient client = cluster.createClient(leaderId)) {
+ client.send(new RaftTestUtil.SimpleMessage("message"));
+ RaftClientReply reply = client.send(
+ new RaftTestUtil.SimpleMessage("message2"));
+
+ long logIndex = reply.getLogIndex();
+ //Confirm that followers have committed
+ RaftClientReply watchReply = client.sendWatch(
+ logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+ watchReply.getCommitInfos().forEach(
+ val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
+ RaftServerImpl secondFollower = cluster.getFollowers().get(1);
+ // Second follower is blocked in apply transaction
+ Assert.assertTrue(
+ secondFollower.getState().getLastAppliedIndex()
+ < logIndex);
+
+ // Now shutdown the follower in a separate thread
+ Thread t = new Thread(() -> secondFollower.shutdown(true));
+ t.start();
+
+ // The second follower should still be blocked in apply transaction
+ Assert.assertTrue(
+ secondFollower.getState().getLastAppliedIndex()
+ < logIndex);
+
+ // Now unblock the second follower
+ ((StateMachineWithConditionalWait) secondFollower.getStateMachine())
+ .unBlockApplyTxn();
+
+ // Now wait for the thread
+ t.join(5000);
+ Assert.assertEquals(
+ secondFollower.getState().getLastAppliedIndex(),
+ logIndex);
+
+ cluster.shutdown();
+ }
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 50c1c13..18dab32 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -62,33 +62,34 @@ public class TestLogAppenderWithGrpc
cluster.start();
// client and leader setup
- RaftClient client = cluster.createClient(cluster.getGroup());
- client.send(new RaftTestUtil.SimpleMessage("m"));
- RaftServerImpl leader = waitForLeader(cluster);
- long initialNextIndex = leader.getState().getNextIndex();
-
- for (RaftServerImpl server : cluster.getFollowers()) {
- // block the appends in the follower
- ((SimpleStateMachine4Testing)server.getStateMachine()).blockWriteStateMachineData();
- }
- Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
- for (int i = 0; i < maxAppends * 2; i++) {
- futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
- }
+ try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
+ client.send(new RaftTestUtil.SimpleMessage("m"));
+ RaftServerImpl leader = waitForLeader(cluster);
+ long initialNextIndex = leader.getState().getNextIndex();
+
+ for (RaftServerImpl server : cluster.getFollowers()) {
+ // block the appends in the follower
+ ((SimpleStateMachine4Testing) server.getStateMachine()).blockWriteStateMachineData();
+ }
+ Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
+ for (int i = 0; i < maxAppends * 2; i++) {
+ futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
+ }
- FIVE_SECONDS.sleep();
- for (long nextIndex : leader.getFollowerNextIndices()) {
- // Verify nextIndex does not progress due to pendingRequests limit
- Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
- }
- ONE_SECOND.sleep();
- for (RaftServerImpl server : cluster.getFollowers()) {
- // unblock the appends in the follower
- ((SimpleStateMachine4Testing)server.getStateMachine()).unblockWriteStateMachineData();
- }
+ FIVE_SECONDS.sleep();
+ for (long nextIndex : leader.getFollowerNextIndices()) {
+ // Verify nextIndex does not progress due to pendingRequests limit
+ Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
+ }
+ ONE_SECOND.sleep();
+ for (RaftServerImpl server : cluster.getFollowers()) {
+ // unblock the appends in the follower
+ ((SimpleStateMachine4Testing) server.getStateMachine()).unblockWriteStateMachineData();
+ }
- JavaUtils.allOf(futures).join();
- cluster.shutdown();
+ JavaUtils.allOf(futures).join();
+ cluster.shutdown();
+ }
}
@Test
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index aa1cca8..85d1171 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -66,7 +66,9 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.SortedMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -219,37 +221,50 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
String message = "2nd Message";
// Block stateMachine flush data, so that 2nd request will not be
// completed, and so it will not be removed from pending request map.
- cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()).sendAsync(new SimpleMessage(message));
+ List<RaftClient> clients = new ArrayList<>();
- final SortedMap< String, Gauge > gaugeMap =
- cluster.getLeader().getRaftServerMetrics().getRegistry()
- .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+ try {
+ RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+ clients.add(client);
+ client.sendAsync(new SimpleMessage(message));
- RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
- 300, 5000);
+ final SortedMap<String, Gauge> gaugeMap =
+ cluster.getLeader().getRaftServerMetrics().getRegistry()
+ .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
- for (int i = 0; i < 10; i++) {
- cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()).sendAsync(new SimpleMessage(message));
- }
+ RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
+ 300, 5000);
+
+ for (int i = 0; i < 10; i++) {
+ client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+ clients.add(client);
+ client.sendAsync(new SimpleMessage(message));
+ }
- // Because we have passed 11 requests, and the element queue size is 10.
- RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
- .getCount() == 1, 300, 5000);
+ // Because we have passed 11 requests, and the element queue size is 10.
+ RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
+ .getCount() == 1, 300, 5000);
- stateMachine.unblockFlushStateMachineData();
+ stateMachine.unblockFlushStateMachineData();
- // Send a message with 120, our byte size limit is 110, so it should fail
- // and byte size counter limit will be hit.
+ // Send a message with 120, our byte size limit is 110, so it should fail
+ // and byte size counter limit will be hit.
- cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry())
- .sendAsync(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+ client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+ client.sendAsync(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+ clients.add(client);
- RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
- .getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
+ RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
+ .getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
- Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
- .getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
+ Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
+ .getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
+ } finally {
+ for (RaftClient client : clients) {
+ client.close();
+ }
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 3d374a8..e96a418 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -184,13 +184,14 @@ public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.Fact
builder.setDefaultPolicy(RetryPolicies.retryForeverNoSleep());
// create a client with the exception dependent policy
- RaftClient client = cluster.createClient(builder.build());
- client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
+ try (final RaftClient client = cluster.createClient(builder.build())) {
+ client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
- leader = cluster.getLeader();
- ((SimpleStateMachine4Testing)leader.getStateMachine()).blockWriteStateMachineData();
+ leader = cluster.getLeader();
+ ((SimpleStateMachine4Testing) leader.getStateMachine()).blockWriteStateMachineData();
- client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+ client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+ }
Assert.fail("Test should have failed.");
} catch (ExecutionException e) {
RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause();
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index ab277de..9a6c816 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -237,16 +237,14 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
futures.add(f);
final SimpleMessage m = messages[i];
- try (final RaftClient client = cluster.createClient()) {
- new Thread(() -> {
- try {
- Assert.assertTrue(client.send(m).isSuccess());
- } catch (IOException e) {
- throw new IllegalStateException("Failed to send " + m, e);
- }
- f.complete(null);
- }).start();
- }
+ new Thread(() -> {
+ try (final RaftClient client = cluster.createClient()) {
+ Assert.assertTrue(client.send(m).isSuccess());
+ } catch (IOException e) {
+ throw new IllegalStateException("Failed to send " + m, e);
+ }
+ f.complete(null);
+ }).start();
}
JavaUtils.allOf(futures).get();
diff --git a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 174ba8c..d0b486a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -185,9 +185,10 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim
for(RaftGroupId gid : registry.keySet()) {
final RaftGroup newGroup = RaftGroup.valueOf(gid, cluster.getPeers());
LOG.info("add new group: " + newGroup);
- final RaftClient client = cluster.createClient(newGroup);
- for(RaftPeer p : newGroup.getPeers()) {
- client.groupAdd(newGroup, p.getId());
+ try (final RaftClient client = cluster.createClient(newGroup)) {
+ for (RaftPeer p : newGroup.getPeers()) {
+ client.groupAdd(newGroup, p.getId());
+ }
}
}