You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/11 07:56:54 UTC

[incubator-ratis] branch master updated: RATIS-921. Fix resource leak by closing thousands of gRPC clients after use (#99)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 965bcec  RATIS-921. Fix resource leak by closing thousands  of gRPC clients after use (#99)
965bcec is described below

commit 965bcec41ccc36056f029becb743d8f24b420d76
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Jun 11 15:56:44 2020 +0800

    RATIS-921. Fix resource leak by closing thousands  of gRPC clients after use (#99)
---
 .../main/java/org/apache/ratis/grpc/GrpcUtil.java  |  35 +++
 .../grpc/client/GrpcClientProtocolClient.java      |   8 +-
 .../grpc/server/GrpcServerProtocolClient.java      |   9 +-
 .../ratis/logservice/impl/LogStreamImpl.java       |   1 +
 .../apache/ratis/logservice/server/LogServer.java  |   1 +
 .../ratis/logservice/server/LogStateMachine.java   |   7 +
 .../ratis/logservice/server/MetaStateMachine.java  |  38 +--
 .../ratis/logservice/LogServiceReadWriteBase.java  |  52 ++--
 .../ratis/logservice/server/TestMetaServer.java    | 300 ++++++++++++---------
 .../java/org/apache/ratis/LogAppenderTests.java    |  46 ++--
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  81 +++---
 .../java/org/apache/ratis/RetryCacheTests.java     |  96 +++----
 .../ratis/server/impl/GroupManagementBaseTest.java |  28 +-
 .../server/impl/RaftReconfigurationBaseTest.java   | 201 +++++++-------
 .../impl/RaftStateMachineExceptionTests.java       | 108 ++++----
 .../server/impl/StateMachineShutdownTests.java     |  77 +++---
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java |  51 ++--
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  |  57 ++--
 .../ratis/retry/TestExceptionDependentRetry.java   |  11 +-
 .../apache/ratis/server/ServerRestartTests.java    |  18 +-
 .../ratis/statemachine/TestStateMachine.java       |   7 +-
 21 files changed, 667 insertions(+), 565 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 0856a42..28b7cc7 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -20,6 +20,7 @@ package org.apache.ratis.grpc;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.ServerNotReadyException;
 import org.apache.ratis.protocol.TimeoutIOException;
+import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.Metadata;
 import org.apache.ratis.thirdparty.io.grpc.Status;
 import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
@@ -30,13 +31,17 @@ import org.apache.ratis.util.LogUtils;
 import org.apache.ratis.util.ReflectionUtils;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
 public interface GrpcUtil {
+  static final Logger LOG = LoggerFactory.getLogger(GrpcUtil.class);
+
   Metadata.Key<String> EXCEPTION_TYPE_KEY =
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
   Metadata.Key<byte[]> EXCEPTION_OBJECT_KEY =
@@ -198,4 +203,34 @@ public interface GrpcUtil {
       return trailers;
     }
   }
+
+  /**
+   * Tries to gracefully shut down the managed channel. Falls back to forceful shutdown if
+   * graceful shutdown times out.
+   */
+  static void shutdownManagedChannel(ManagedChannel managedChannel) {
+    // Close the gRPC managed-channel if not shut down already.
+    if (!managedChannel.isShutdown()) {
+      try {
+        managedChannel.shutdown();
+        if (!managedChannel.awaitTermination(3, TimeUnit.SECONDS)) {
+          LOG.warn("Timed out gracefully shutting down connection: {}. ", managedChannel);
+        }
+      } catch (Exception e) {
+        LOG.error("Unexpected exception while waiting for channel termination", e);
+      }
+    }
+
+    // Forceful shut down if still not terminated.
+    if (!managedChannel.isTerminated()) {
+      try {
+        managedChannel.shutdownNow();
+        if (!managedChannel.awaitTermination(2, TimeUnit.SECONDS)) {
+          LOG.warn("Timed out forcefully shutting down connection: {}. ", managedChannel);
+        }
+      } catch (Exception e) {
+        LOG.error("Unexpected exception while waiting for channel termination", e);
+      }
+    }
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index be9c7b2..1d6860a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -66,7 +66,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -143,12 +142,7 @@ public class GrpcClientProtocolClient implements Closeable {
   public void close() {
     Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
-    channel.shutdown();
-    try {
-      channel.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.error("Unexpected exception while waiting for channel termination", e);
-    }
+    GrpcUtil.shutdownManagedChannel(channel);
     scheduler.close();
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
index cb46f99..399089a 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcServerProtocolClient.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.grpc.server;
 
 import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.thirdparty.io.grpc.ManagedChannel;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
@@ -35,7 +36,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
-import java.util.concurrent.TimeUnit;
 
 /**
  * This is a RaftClient implementation that supports streaming data to the raft
@@ -89,12 +89,7 @@ public class GrpcServerProtocolClient implements Closeable {
 
   @Override
   public void close() {
-    channel.shutdown();
-    try {
-      channel.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.error("Unexpected exception while waiting for channel termination, peerId={}", raftPeerId, e);
-    }
+    GrpcUtil.shutdownManagedChannel(channel);
   }
 
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
index 6f73fa4..7980de7 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/impl/LogStreamImpl.java
@@ -192,6 +192,7 @@ public class LogStreamImpl implements LogStream {
   @Override
   public void close() throws Exception {
     // TODO Auto-generated method stub
+    raftClient.close();
     state = State.CLOSED;
   }
 
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
index 22ac6cc..b68a763 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogServer.java
@@ -176,6 +176,7 @@ public class LogServer extends BaseServer {
 
 
     public void close() throws IOException {
+        metaClient.close();
         raftServer.close();
         daemon.interrupt();
     }
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 3a0d779..02e9234 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -485,6 +485,13 @@ public class LogStateMachine extends BaseStateMachine {
   public void close() {
     reset();
     logServiceMetrics.unregister();
+    if (client != null) {
+      try {
+        client.close();
+      } catch (Exception ignored) {
+        LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+      }
+    }
   }
 
   @Override
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
index 1313c32..a0ba191 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/MetaStateMachine.java
@@ -261,21 +261,15 @@ public class MetaStateMachine extends BaseStateMachine {
         } else {
             Collection<RaftPeer> raftPeers = raftGroup.getPeers();
             raftPeers.stream().forEach(peer -> {
-                RaftClient client = RaftClient.newBuilder().setProperties(properties)
-                    .setClientId(ClientId.randomId())
-                    .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
-                try {
+                try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+                    .setClientId(ClientId.randomId()).setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()){
                     client.groupRemove(raftGroup.getGroupId(), true, peer.getId());
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             });
-            RaftClient client = RaftClient.newBuilder()
-                    .setRaftGroup(currentGroup)
-                    .setClientId(ClientId.randomId())
-                    .setProperties(properties)
-                    .build();
-            try {
+            try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
+                .setClientId(ClientId.randomId()).setProperties(properties).build()){
                 client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
                         .setUnregisterRequest(
                                 LogServiceUnregisterLogRequestProto.newBuilder()
@@ -323,9 +317,8 @@ public class MetaStateMachine extends BaseStateMachine {
                 int provisionedPeers = 0;
                 Exception originalException = null;
                 for (RaftPeer peer : peers) {
-                    RaftClient client = RaftClient.newBuilder().setProperties(properties)
-                        .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
-                    try {
+                    try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+                        .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
                         client.groupAdd(raftGroup, peer.getId());
                     } catch (IOException e) {
                         LOG.error("Failed to add Raft group ({}) for new Log({})",
@@ -343,9 +336,8 @@ public class MetaStateMachine extends BaseStateMachine {
                         if (tornDownPeers >= provisionedPeers) {
                             break;
                         }
-                        RaftClient client = RaftClient.newBuilder().setProperties(properties)
-                            .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build();
-                        try {
+                        try (RaftClient client = RaftClient.newBuilder().setProperties(properties)
+                            .setRaftGroup(RaftGroup.valueOf(logServerGroupId, peer)).build()) {
                             client.groupRemove(raftGroup.getGroupId(), true, peer.getId());
                         } catch (IOException e) {
                             LOG.error("Failed to clean up Raft group ({}) for peer ({}), "
@@ -358,12 +350,8 @@ public class MetaStateMachine extends BaseStateMachine {
                         MetaServiceProtoUtil.toCreateLogExceptionReplyProto(originalException)
                             .build().toByteString()));
                 }
-                RaftClient client = RaftClient.newBuilder()
-                        .setRaftGroup(currentGroup)
-                        .setClientId(ClientId.randomId())
-                        .setProperties(properties)
-                        .build();
-                try {
+                try (RaftClient client = RaftClient.newBuilder().setRaftGroup(currentGroup)
+                    .setClientId(ClientId.randomId()).setProperties(properties).build()){
                     client.send(() -> MetaServiceProtos.MetaSMRequestProto.newBuilder()
                             .setRegisterRequest(LogServiceRegisterLogRequestProto.newBuilder()
                                     .setLogname(LogServiceProtoUtil.toLogNameProto(name))
@@ -458,9 +446,8 @@ public class MetaStateMachine extends BaseStateMachine {
                                 while(itr.hasNext()) {
                                     LogName logName = itr.next();
                                     RaftGroup group = map.get(logName);
-                                    RaftClient client = RaftClient.newBuilder().
-                                            setRaftGroup(group).setProperties(properties).build();
-                                    try {
+                                    try (RaftClient client = RaftClient.newBuilder()
+                                        .setRaftGroup(group).setProperties(properties).build()) {
                                         LOG.warn(String.format("Peer %s in the group %s went down." +
                                                         " Hence closing the log %s serve by the group.",
                                                 raftPeer.toString(), group.toString(), logName.toString()));
@@ -475,7 +462,6 @@ public class MetaStateMachine extends BaseStateMachine {
                                             throw new IOException(message.getException().getErrorMsg());
                                         }
                                         itr.remove();
-                                        client.close();
                                     } catch (IOException e) {
                                         LOG.warn(String.format("Failed to close log %s on peer %s failure.",
                                                 logName, raftPeer.toString()), e);
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
index c579990..84aba03 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/LogServiceReadWriteBase.java
@@ -125,12 +125,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testLogServiceReadWrite() throws Exception {
-    RaftClient raftClient =
-        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
-            .build();
-    LogName logName = LogName.of("log1");
-    // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+    try (RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties())
+            .setRaftGroup(cluster.getGroup()).build()) {
+      LogName logName = LogName.of("log1");
+      // TODO need API to circumvent metadata service for testing
+      LogStream logStream = new MetricLogStream(logName, raftClient);
       assertEquals("log1", logStream.getName().getName());
       assertEquals(State.OPEN, logStream.getState());
       assertEquals(0, logStream.getSize());
@@ -181,13 +181,13 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testReadAllRecords() throws Exception {
-    final RaftClient raftClient =
-        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
-            .build();
-    final LogName logName = LogName.of("log1");
-    final int numRecords = 25;
-    // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+    try (RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties())
+            .setRaftGroup(cluster.getGroup()).build()) {
+      final LogName logName = LogName.of("log1");
+      final int numRecords = 25;
+      // TODO need API to circumvent metadata service for testing
+      LogStream logStream = new MetricLogStream(logName, raftClient);
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
         // Write records 0 through 99 (inclusive)
@@ -222,13 +222,13 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testSeeking() throws Exception {
-    final RaftClient raftClient =
-        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
-            .build();
-    final LogName logName = LogName.of("log1");
-    final int numRecords = 100;
-    // TODO need API to circumvent metadata service for testing
-    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+    try (final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties())
+            .setRaftGroup(cluster.getGroup()).build()) {
+      final LogName logName = LogName.of("log1");
+      final int numRecords = 100;
+      // TODO need API to circumvent metadata service for testing
+      LogStream logStream = new MetricLogStream(logName, raftClient);
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
         // Write records 0 through 99 (inclusive)
@@ -259,12 +259,12 @@ public abstract class LogServiceReadWriteBase<CLUSTER extends MiniRaftCluster>
 
   @Test
   public void testSeekFromWrite() throws Exception {
-    final RaftClient raftClient =
-        RaftClient.newBuilder().setProperties(getProperties()).setRaftGroup(cluster.getGroup())
-            .build();
-    final LogName logName = LogName.of("log1");
-    final int numRecords = 10;
-    try (LogStream logStream = new MetricLogStream(logName, raftClient)) {
+    try (final RaftClient raftClient =
+        RaftClient.newBuilder().setProperties(getProperties())
+            .setRaftGroup(cluster.getGroup()).build()) {
+      final LogName logName = LogName.of("log1");
+      final int numRecords = 10;
+      LogStream logStream = new MetricLogStream(logName, raftClient);
       final List<Long> recordIds;
       try (LogWriter writer = logStream.createWriter()) {
         LOG.info("Writing {} records", numRecords);
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 03c6e80..c911fad 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -39,6 +39,8 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -55,6 +57,8 @@ import javax.management.ObjectName;
 import static org.junit.Assert.*;
 
 public class TestMetaServer {
+    private static final Logger LOG = LoggerFactory.getLogger(TestMetaServer.class);
+
     static {
         JVMMetrics.initJvmMetrics(TimeDuration.valueOf(10, TimeUnit.SECONDS));
     }
@@ -129,10 +133,12 @@ public class TestMetaServer {
     @Test
     public void testCreateAndGetLog() throws Exception {
         // This should be LogServiceStream ?
-        LogStream logStream1 = client.createLog(LogName.of("testCreateLog"));
-        assertNotNull(logStream1);
-        LogStream logStream2 = client.getLog(LogName.of("testCreateLog"));
-        assertNotNull(logStream2);
+        try (LogStream logStream1 = client.createLog(LogName.of("testCreateLog"))) {
+            assertNotNull(logStream1);
+        }
+        try (LogStream logStream2 = client.getLog(LogName.of("testCreateLog"))) {
+            assertNotNull(logStream2);
+        }
     }
 
     /**
@@ -144,8 +150,9 @@ public class TestMetaServer {
         boolean peerClosed = false;
         try {
             for(int i = 0; i < 5; i++) {
-                LogStream logStream1 = client.createLog(LogName.of("testCloseLogOnNodeFailure"+i));
-                assertNotNull(logStream1);
+                try (LogStream logStream1 = client.createLog(LogName.of("testCloseLogOnNodeFailure"+i))) {
+                    assertNotNull(logStream1);
+                }
             }
             assertTrue(((MetaStateMachine)cluster.getMasters().get(0).getMetaStateMachine()).checkPeersAreSame());
             workers.get(0).close();
@@ -153,9 +160,10 @@ public class TestMetaServer {
             Thread.sleep(90000);
             assertTrue(((MetaStateMachine)cluster.getMasters().get(0).getMetaStateMachine()).checkPeersAreSame());
             for(int i = 0; i < 5; i++) {
-                LogStream logStream2 = client.getLog(LogName.of("testCloseLogOnNodeFailure"+i));
-                assertNotNull(logStream2);
-                assertEquals(State.CLOSED, logStream2.getState());
+                try (LogStream logStream2 = client.getLog(LogName.of("testCloseLogOnNodeFailure"+i))) {
+                    assertNotNull(logStream2);
+                    assertEquals(State.CLOSED, logStream2.getState());
+                }
             }
         } finally {
             if(peerClosed) {
@@ -166,108 +174,111 @@ public class TestMetaServer {
     }
 
     @Test
-    public void testReadWritetoLog() throws IOException, InterruptedException {
-        LogStream stream = client.createLog(LogName.of("testReadWrite"));
-        LogWriter writer = stream.createWriter();
-        ByteBuffer testMessage =  ByteBuffer.wrap("Hello world!".getBytes());
-        List<LogInfo> listLogs = client.listLogs();
-        assert(listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
-        List<LogServer> workers = cluster.getWorkers();
-        for(LogServer worker : workers) {
-             RaftServerImpl server = ((RaftServerProxy)worker.getServer())
-                     .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
-        // TODO: perform all additional checks on state machine level
-        }
-        writer.write(testMessage);
-        for(LogServer worker : workers) {
-            RaftServerImpl server = ((RaftServerProxy)worker.getServer())
-                    .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
-        }
+    public void testReadWritetoLog() throws Exception {
+        try (LogStream stream = client.createLog(LogName.of("testReadWrite"))) {
+            LogWriter writer = stream.createWriter();
+            ByteBuffer testMessage = ByteBuffer.wrap("Hello world!".getBytes());
+            List<LogInfo> listLogs = client.listLogs();
+            assert (listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
+            List<LogServer> workers = cluster.getWorkers();
+            for (LogServer worker : workers) {
+                RaftServerImpl server = ((RaftServerProxy) worker.getServer())
+                        .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+                // TODO: perform all additional checks on state machine level
+            }
+            writer.write(testMessage);
+            for (LogServer worker : workers) {
+                RaftServerImpl server = ((RaftServerProxy) worker.getServer())
+                        .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+            }
 //        assert(stream.getSize() > 0); //TODO: Doesn't work
-        LogReader reader = stream.createReader();
-        ByteBuffer res = reader.readNext();
-        assert(res.array().length > 0);
+            LogReader reader = stream.createReader();
+            ByteBuffer res = reader.readNext();
+            assert (res.array().length > 0);
+        }
     }
 
     @Test
-    public void testLogArchival() throws IOException, InterruptedException {
+    public void testLogArchival() throws Exception {
         LogName logName = LogName.of("testArchivalLog");
-        LogStream logStream = client.createLog(logName);
-        LogWriter writer = logStream.createWriter();
-        List<LogInfo> listLogs = client.listLogs();
-        assert (listLogs.stream()
-            .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
-        List<LogServer> workers = cluster.getWorkers();
-        List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
-        writer.write(records);
-        client.closeLog(logName);
-        assertEquals(logStream.getState(), State.CLOSED);
-        client.archiveLog(logName);
-        int retry = 0;
-        while (logStream.getState() != State.ARCHIVED && retry <= 40) {
-            Thread.sleep(1000);
-            retry++;
+        try (LogStream logStream = client.createLog(logName)) {
+            LogWriter writer = logStream.createWriter();
+            List<LogInfo> listLogs = client.listLogs();
+            assert (listLogs.stream()
+                    .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+            List<LogServer> workers = cluster.getWorkers();
+            List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+            writer.write(records);
+            client.closeLog(logName);
+            assertEquals(logStream.getState(), State.CLOSED);
+            client.archiveLog(logName);
+            int retry = 0;
+            while (logStream.getState() != State.ARCHIVED && retry <= 40) {
+                Thread.sleep(1000);
+                retry++;
+            }
+            assertEquals(logStream.getState(), State.ARCHIVED);
+            LogReader reader = logStream.createReader();
+            List<ByteBuffer> data = reader.readBulk(records.size());
+            assertEquals(records.size(), data.size());
+            reader.seek(1);
+            data = reader.readBulk(records.size());
+            assertEquals(records.size() - 1, data.size());
+
+            //Test ArchiveLogStream
+            LogServiceConfiguration config = LogServiceConfiguration.create();
+            LogStream archiveLogStream = client.getArchivedLog(logName);
+            reader = archiveLogStream.createReader();
+            data = reader.readBulk(records.size());
+            assertEquals(records.size(), data.size());
         }
-        assertEquals(logStream.getState(), State.ARCHIVED);
-        LogReader reader = logStream.createReader();
-        List<ByteBuffer> data = reader.readBulk(records.size());
-        assertEquals(records.size(), data.size());
-        reader.seek(1);
-        data = reader.readBulk(records.size());
-        assertEquals(records.size() - 1, data.size());
-
-        //Test ArchiveLogStream
-        LogServiceConfiguration config = LogServiceConfiguration.create();
-        LogStream archiveLogStream = client.getArchivedLog(logName);
-        reader = archiveLogStream.createReader();
-        data = reader.readBulk(records.size());
-        assertEquals(records.size(), data.size());
     }
 
     @Test
-    public void testLogExport() throws IOException, InterruptedException {
+    public void testLogExport() throws Exception {
         LogName logName = LogName.of("testLogExport");
-        LogStream logStream = client.createLog(logName);
-        LogWriter writer = logStream.createWriter();
-        List<LogInfo> listLogs = client.listLogs();
-        assert (listLogs.stream()
-            .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
-        List<LogServer> workers = cluster.getWorkers();
-        List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
-        writer.write(records);
-        String location1 = "target/tmp/export_1/";
-        String location2 = "target/tmp/export_2/";
-        deleteLocalDirectory(new File(location1));
-        deleteLocalDirectory(new File(location2));
-        int startPosition1 = 3;
-        int startPosition2 = 5;
-        client.exportLog(logName, location1, startPosition1);
-        client.exportLog(logName, location2, startPosition2);
-        List<ArchivalInfo> infos=client.getExportStatus(logName);
-        int count=0;
-        while (infos.size() > 1 && (
-            infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
-                || infos.get(1).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED)
-            && count < 10) {
-            infos = client.getExportStatus(logName);
-            ;
-            Thread.sleep(1000);
-            count++;
+        try (LogStream logStream = client.createLog(logName)) {
+            LogWriter writer = logStream.createWriter();
+            List<LogInfo> listLogs = client.listLogs();
+            assert (listLogs.stream()
+                    .filter(log -> log.getLogName().getName().startsWith(logName.getName())).count() == 1);
+            List<LogServer> workers = cluster.getWorkers();
+            List<ByteBuffer> records = TestUtils.getRandomData(100, 10);
+            writer.write(records);
+            String location1 = "target/tmp/export_1/";
+            String location2 = "target/tmp/export_2/";
+            deleteLocalDirectory(new File(location1));
+            deleteLocalDirectory(new File(location2));
+            int startPosition1 = 3;
+            int startPosition2 = 5;
+            client.exportLog(logName, location1, startPosition1);
+            client.exportLog(logName, location2, startPosition2);
+            List<ArchivalInfo> infos = client.getExportStatus(logName);
+            int count = 0;
+            while (infos.size() > 1 && (
+                    infos.get(0).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED
+                            || infos.get(1).getStatus() != ArchivalInfo.ArchivalStatus.COMPLETED)
+                    && count < 10) {
+                infos = client.getExportStatus(logName);
+                ;
+                Thread.sleep(1000);
+                count++;
 
-        }
+            }
 
-        //Test ExportLogStream
-        LogStream exportLogStream = client.getExportLog(logName, location1);
-        LogReader reader = exportLogStream.createReader();
-        List<ByteBuffer> data = reader.readBulk(records.size());
-        assertEquals(records.size() - startPosition1, data.size());
-        reader.close();
-        exportLogStream = client.getExportLog(logName, location2);
-        reader = exportLogStream.createReader();
-        data = reader.readBulk(records.size());
-        assertEquals(records.size() - startPosition2, data.size());
-        reader.close();
-        writer.close();
+            //Test ExportLogStream
+            LogStream exportLogStream = client.getExportLog(logName, location1);
+            LogReader reader = exportLogStream.createReader();
+            List<ByteBuffer> data = reader.readBulk(records.size());
+            assertEquals(records.size() - startPosition1, data.size());
+            reader.close();
+            exportLogStream = client.getExportLog(logName, location2);
+            reader = exportLogStream.createReader();
+            data = reader.readBulk(records.size());
+            assertEquals(records.size() - startPosition2, data.size());
+            reader.close();
+            writer.close();
+        }
     }
 
     boolean deleteLocalDirectory(File dir) {
@@ -289,31 +300,40 @@ public class TestMetaServer {
     @Test
     public void testDeleteLog() throws Exception {
         // This should be LogServiceStream ?
-        LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"));
-        assertNotNull(logStream1);
-        client.deleteLog(LogName.of("testDeleteLog"));
-        testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
-            (long) deleteCount.get());
-        try {
-          logStream1 = client.getLog(LogName.of("testDeleteLog"));
-            fail("Failed to throw LogNotFoundException");
-        } catch(Exception e) {
-            assert(e instanceof LogNotFoundException);
+        try (LogStream logStream1 = client.createLog(LogName.of("testDeleteLog"))) {
+            assertNotNull(logStream1);
+            client.deleteLog(LogName.of("testDeleteLog"));
+            testJMXCount(MetaServiceProtos.MetaServiceRequestProto.TypeCase.DELETELOG.name(),
+                    (long) deleteCount.get());
+            LogStream logStream2 = null;
+            try {
+                logStream2 = client.getLog(LogName.of("testDeleteLog"));
+                fail("Failed to throw LogNotFoundException");
+            } catch (Exception e) {
+                assert (e instanceof LogNotFoundException);
+            } finally {
+                if (logStream2 != null) {
+                    logStream2.close();
+                }
+            }
         }
-
-
     }
     /**
      * Test for getting not existing log. Should throw an exception
      * @throws IOException
      */
     @Test
-    public void testGetNotExistingLog() {
+    public void testGetNotExistingLog() throws Exception {
+        LogStream logStream = null;
         try {
-            LogStream log = client.getLog(LogName.of("no_such_log"));
+            logStream = client.getLog(LogName.of("no_such_log"));
             fail("LogNotFoundException was not thrown");
         } catch (IOException e) {
             assert(e instanceof LogNotFoundException);
+        } finally {
+            if (logStream != null) {
+               logStream.close();
+            }
         }
     }
 
@@ -323,13 +343,30 @@ public class TestMetaServer {
      */
     @Test
     public void testAlreadyExistLog() throws Exception {
-        LogStream logStream1 = client.createLog(LogName.of("test1"));
-        assertNotNull(logStream1);
+        try (LogStream logStream1 = client.createLog(LogName.of("test1"))) {
+            assertNotNull(logStream1);
+            LogStream logStream2 = null;
+            try {
+                logStream2 = client.createLog(LogName.of("test1"));
+                fail("Didn't fail with LogAlreadyExistException");
+            } catch (IOException e) {
+                assert (e instanceof LogAlreadyExistException);
+            } finally {
+                if (logStream2 != null) {
+                    logStream2.close();
+                }
+            }
+        }
+    }
+
+    private void createLogAndClose(String name) throws Exception {
+        LogStream logStream = null;
         try {
-            logStream1 = client.createLog(LogName.of("test1"));
-            fail("Didn't fail with LogAlreadyExistException");
-        } catch (IOException e) {
-            assert(e instanceof LogAlreadyExistException);
+            logStream = client.createLog(LogName.of(name));
+        } finally {
+            if (logStream != null) {
+                logStream.close();
+            }
         }
     }
 
@@ -339,13 +376,13 @@ public class TestMetaServer {
      */
     @Test
     public void testListLogs() throws Exception {
-        client.createLog(LogName.of("listLogTest1"));
-        client.createLog(LogName.of("listLogTest2"));
-        client.createLog(LogName.of("listLogTest3"));
-        client.createLog(LogName.of("listLogTest4"));
-        client.createLog(LogName.of("listLogTest5"));
-        client.createLog(LogName.of("listLogTest6"));
-        client.createLog(LogName.of("listLogTest7"));
+        createLogAndClose("listLogTest1");
+        createLogAndClose("listLogTest2");
+        createLogAndClose("listLogTest3");
+        createLogAndClose("listLogTest4");
+        createLogAndClose("listLogTest5");
+        createLogAndClose("listLogTest6");
+        createLogAndClose("listLogTest7");
         // Test jmx
 
         List<LogInfo> list = client.listLogs();
@@ -380,10 +417,19 @@ public class TestMetaServer {
     @Test
     public void testFinalClieanUp() throws Exception {
         IntStream.range(0, 10).forEach(i -> {
+            LogStream logStream = null;
             try {
-                client.createLog(LogName.of("CleanTest" + i));
+                logStream = client.createLog(LogName.of("CleanTest" + i));
             } catch (IOException e) {
                 throw new RuntimeException(e);
+            } finally {
+                if (logStream != null) {
+                    try {
+                        logStream.close();
+                    } catch (Exception ignored) {
+                        LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+                    }
+                }
             }
         });
         List<LogInfo> list = client.listLogs();
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 30a1452..2f87cd8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -43,6 +43,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
@@ -51,8 +52,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import com.codahale.metrics.Gauge;
 
@@ -172,23 +171,40 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
     final int numMsgs = 10;
     final int numClients = 5;
     final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+    List<RaftClient> clients = new ArrayList<>();
 
-    // start several clients and write concurrently
-    final CountDownLatch latch = new CountDownLatch(1);
-    final List<Sender> senders = Stream.iterate(0, i -> i+1).limit(numClients)
-        .map(i -> new Sender(cluster.createClient(leaderId), numMsgs, latch))
-        .collect(Collectors.toList());
-    senders.forEach(Thread::start);
+    try {
+      List<Sender> senders = new ArrayList<>();
 
-    latch.countDown();
+      // start several clients and write concurrently
+      final CountDownLatch latch = new CountDownLatch(1);
 
-    for (Sender s : senders) {
-      s.join();
-      final Exception e = s.exception.get();
-      if (e != null) {
-        throw e;
+      for (int i = 0; i < numClients; i ++) {
+        RaftClient client = cluster.createClient(leaderId);
+        clients.add(client);
+        senders.add(new Sender(client, numMsgs, latch));
+      }
+
+      senders.forEach(Thread::start);
+
+      latch.countDown();
+
+      for (Sender s : senders) {
+        s.join();
+        final Exception e = s.exception.get();
+        if (e != null) {
+          throw e;
+        }
+        Assert.assertTrue(s.succeed.get());
+      }
+    } finally {
+      for (int i = 0; i < clients.size(); i ++) {
+        try {
+          clients.get(i).close();
+        } catch (Exception ignored) {
+          LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+        }
       }
-      Assert.assertTrue(s.succeed.get());
     }
 
     final ServerState leaderState = cluster.getLeader().getState();
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 6f44bfc..2a6b463 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -199,47 +199,48 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     int numMessages = RaftClientConfigKeys.Async.outstandingRequestsMax(getProperties());
     CompletableFuture[] futures = new CompletableFuture[numMessages + 1];
     final SimpleMessage[] messages = SimpleMessage.create(numMessages);
-    final RaftClient client = cluster.createClient();
-    //Set blockTransaction flag so that transaction blocks
-    cluster.getServers().stream()
-        .map(cluster::getRaftServerImpl)
-        .map(SimpleStateMachine4Testing::get)
-        .forEach(SimpleStateMachine4Testing::blockStartTransaction);
-
-    //Send numMessages which are blocked and do not release the client semaphore permits
-    AtomicInteger blockedRequestsCount = new AtomicInteger();
-    for (int i=0; i<numMessages; i++) {
-      blockedRequestsCount.getAndIncrement();
-      futures[i] = client.sendAsync(messages[i]);
-      blockedRequestsCount.decrementAndGet();
-    }
-    Assert.assertEquals(0, blockedRequestsCount.get());
-
-    futures[numMessages] = CompletableFuture.supplyAsync(() -> {
-      blockedRequestsCount.incrementAndGet();
-      client.sendAsync(new SimpleMessage("n1"));
-      blockedRequestsCount.decrementAndGet();
-      return null;
-    });
-
-    //Allow the last msg to be sent
-    while (blockedRequestsCount.get() != 1) {
-      Thread.sleep(1000);
-    }
-    Assert.assertEquals(1, blockedRequestsCount.get());
-    //Since all semaphore permits are acquired the last message sent is in queue
-    RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
-
-    //Unset the blockTransaction flag so that semaphore permits can be released
-    cluster.getServers().stream()
-        .map(cluster::getRaftServerImpl)
-        .map(SimpleStateMachine4Testing::get)
-        .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
-
-    for(int i=0; i<=numMessages; i++){
-      futures[i].join();
+    try (final RaftClient client = cluster.createClient()) {
+      //Set blockTransaction flag so that transaction blocks
+      cluster.getServers().stream()
+              .map(cluster::getRaftServerImpl)
+              .map(SimpleStateMachine4Testing::get)
+              .forEach(SimpleStateMachine4Testing::blockStartTransaction);
+
+      //Send numMessages which are blocked and do not release the client semaphore permits
+      AtomicInteger blockedRequestsCount = new AtomicInteger();
+      for (int i = 0; i < numMessages; i++) {
+        blockedRequestsCount.getAndIncrement();
+        futures[i] = client.sendAsync(messages[i]);
+        blockedRequestsCount.decrementAndGet();
+      }
+      Assert.assertEquals(0, blockedRequestsCount.get());
+
+      futures[numMessages] = CompletableFuture.supplyAsync(() -> {
+        blockedRequestsCount.incrementAndGet();
+        client.sendAsync(new SimpleMessage("n1"));
+        blockedRequestsCount.decrementAndGet();
+        return null;
+      });
+
+      //Allow the last msg to be sent
+      while (blockedRequestsCount.get() != 1) {
+        Thread.sleep(1000);
+      }
+      Assert.assertEquals(1, blockedRequestsCount.get());
+      //Since all semaphore permits are acquired the last message sent is in queue
+      RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
+
+      //Unset the blockTransaction flag so that semaphore permits can be released
+      cluster.getServers().stream()
+              .map(cluster::getRaftServerImpl)
+              .map(SimpleStateMachine4Testing::get)
+              .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+
+      for (int i = 0; i <= numMessages; i++) {
+        futures[i].join();
+      }
+      Assert.assertEquals(0, blockedRequestsCount.get());
     }
-    Assert.assertEquals(0, blockedRequestsCount.get());
   }
 
   void runTestBasicAppendEntriesAsync(boolean killLeader) throws Exception {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 37d4e99..e3eec8c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -65,20 +65,20 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
     final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId();
     long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
 
-    final RaftClient client = cluster.createClient(leaderId);
-    final RaftClientRpc rpc = client.getClientRpc();
-    final long callId = 999;
-    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, new SimpleMessage("message"));
-    assertReply(rpc.sendRequest(r), client, callId);
-
-    // retry with the same callId
-    for (int i = 0; i < 5; i++) {
+    try (final RaftClient client = cluster.createClient(leaderId)) {
+      final RaftClientRpc rpc = client.getClientRpc();
+      final long callId = 999;
+      RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+              callId, new SimpleMessage("message"));
       assertReply(rpc.sendRequest(r), client, callId);
-    }
 
-    assertServer(cluster, client.getId(), callId, oldLastApplied);
-    client.close();
+      // retry with the same callId
+      for (int i = 0; i < 5; i++) {
+        assertReply(rpc.sendRequest(r), client, callId);
+      }
+
+      assertServer(cluster, client.getId(), callId, oldLastApplied);
+    }
   }
 
   public static RaftClient assertReply(RaftClientReply reply, RaftClient client, long callId) {
@@ -126,43 +126,43 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
     RaftTestUtil.waitForLeader(cluster);
     final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId();
 
-    final RaftClient client = cluster.createClient(leaderId);
-    RaftClientRpc rpc = client.getClientRpc();
-    final long callId = 999;
-    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, new SimpleMessage("message"));
-    assertReply(rpc.sendRequest(r), client, callId);
-    long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
-
-    // trigger the reconfiguration, make sure the original leader is kicked out
-    PeerChanges change = cluster.addNewPeers(2, true);
-    RaftPeer[] allPeers = cluster.removePeers(2, true,
-        asList(change.newPeers)).allPeersInNewConf;
-    // trigger setConfiguration
-    cluster.setConfiguration(allPeers);
-
-    final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
-      final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
-      Assert.assertNotEquals(leaderId, id);
-      return id;
-    }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a leader different than " + leaderId, LOG);
-    Assert.assertNotEquals(leaderId, newLeaderId);
-    // same clientId and callId in the request
-    r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
-        callId, new SimpleMessage("message"));
-    rpc.addServers(Arrays.asList(change.newPeers));
-    for (int i = 0; i < 10; i++) {
-      try {
-        assertReply(rpc.sendRequest(r), client, callId);
-        LOG.info("successfully sent out the retry request_" + i);
-      } catch (Exception e) {
-        LOG.info("hit exception while retrying the same request: " + r, e);
+    try (final RaftClient client = cluster.createClient(leaderId)) {
+      RaftClientRpc rpc = client.getClientRpc();
+      final long callId = 999;
+      RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+              callId, new SimpleMessage("message"));
+      assertReply(rpc.sendRequest(r), client, callId);
+      long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+
+      // trigger the reconfiguration, make sure the original leader is kicked out
+      PeerChanges change = cluster.addNewPeers(2, true);
+      RaftPeer[] allPeers = cluster.removePeers(2, true,
+              asList(change.newPeers)).allPeersInNewConf;
+      // trigger setConfiguration
+      cluster.setConfiguration(allPeers);
+
+      final RaftPeerId newLeaderId = JavaUtils.attemptRepeatedly(() -> {
+        final RaftPeerId id = RaftTestUtil.waitForLeader(cluster).getId();
+        Assert.assertNotEquals(leaderId, id);
+        return id;
+      }, 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS), "wait for a leader different than " + leaderId, LOG);
+      Assert.assertNotEquals(leaderId, newLeaderId);
+      // same clientId and callId in the request
+      r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
+              callId, new SimpleMessage("message"));
+      rpc.addServers(Arrays.asList(change.newPeers));
+      for (int i = 0; i < 10; i++) {
+        try {
+          assertReply(rpc.sendRequest(r), client, callId);
+          LOG.info("successfully sent out the retry request_" + i);
+        } catch (Exception e) {
+          LOG.info("hit exception while retrying the same request: " + r, e);
+        }
+        Thread.sleep(100);
       }
-      Thread.sleep(100);
-    }
 
-    // check the new leader and make sure the retry did not get committed
-    Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
-    client.close();
+      // check the new leader and make sure the retry did not get committed
+      Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
+    }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 0ad0906..acc26ab 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -85,9 +85,10 @@ public abstract class GroupManagementBaseTest extends BaseTest {
     // Add groups
     final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), cluster.getPeers());
     LOG.info("add new group: " + newGroup);
-    final RaftClient client = cluster.createClient(newGroup);
-    for(RaftPeer p : newGroup.getPeers()) {
-      client.groupAdd(newGroup, p.getId());
+    try (final RaftClient client = cluster.createClient(newGroup)) {
+      for (RaftPeer p : newGroup.getPeers()) {
+        client.groupAdd(newGroup, p.getId());
+      }
     }
     Assert.assertNotNull(RaftTestUtil.waitForLeader(cluster));
     TimeUnit.SECONDS.sleep(1);
@@ -241,16 +242,17 @@ public abstract class GroupManagementBaseTest extends BaseTest {
     final RaftPeer peer = cluster.getPeers().get(0);
     final RaftPeerId peerId = peer.getId();
     final RaftGroup group = RaftGroup.valueOf(cluster.getGroupId(), peer);
-    final RaftClient client = cluster.createClient();
-    Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
-    try {
-      client.groupAdd(group, peer.getId());
-    } catch (IOException ex) {
-      // HadoopRPC throws RemoteException, which makes it hard to check if
-      // the exception is instance of AlreadyExistsException
-      Assert.assertTrue(ex.toString().contains(AlreadyExistsException.class.getCanonicalName()));
+    try (final RaftClient client = cluster.createClient()) {
+      Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+      try {
+        client.groupAdd(group, peer.getId());
+      } catch (IOException ex) {
+        // HadoopRPC throws RemoteException, which makes it hard to check if
+        // the exception is instance of AlreadyExistsException
+        Assert.assertTrue(ex.toString().contains(AlreadyExistsException.class.getCanonicalName()));
+      }
+      Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+      cluster.shutdown();
     }
-    Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
-    cluster.shutdown();
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index f7545bc..1f83cac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -142,63 +142,63 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
 
   void runTestReconfTwice(CLUSTER cluster) throws Exception {
       final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
-      final RaftClient client = cluster.createClient(leaderId);
+      try (final RaftClient client = cluster.createClient(leaderId)) {
 
-      // submit some msgs before reconf
-      for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
-        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
-        Assert.assertTrue(reply.isSuccess());
-      }
-
-      final AtomicBoolean reconf1 = new AtomicBoolean(false);
-      final AtomicBoolean reconf2 = new AtomicBoolean(false);
-      final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null);
-      final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null);
-      CountDownLatch latch = new CountDownLatch(1);
-      Thread clientThread = new Thread(() -> {
-        try {
-          PeerChanges c1 = cluster.addNewPeers(2, true);
-          LOG.info("Start changing the configuration: {}",
-              asList(c1.allPeersInNewConf));
-
-          RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
-          reconf1.set(reply.isSuccess());
-
-          PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
-          finalPeers.set(c2.allPeersInNewConf);
-          deadPeers.set(c2.removedPeers);
-
-          LOG.info("Start changing the configuration again: {}",
-              asList(c2.allPeersInNewConf));
-          reply = client.setConfiguration(c2.allPeersInNewConf);
-          reconf2.set(reply.isSuccess());
-
-          latch.countDown();
-          client.close();
-        } catch(Exception ignored) {
-          LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+        // submit some msgs before reconf
+        for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
+          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
         }
-      });
-      clientThread.start();
 
-      latch.await();
-      Assert.assertTrue(reconf1.get());
-      Assert.assertTrue(reconf2.get());
-      waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
-      final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
-
-      // check configuration manager's internal state
-      // each reconf will generate two configurations: (old, new) and (new)
-      cluster.getServerAliveStream().forEach(server -> {
-        ConfigurationManager confManager =
-            (ConfigurationManager) Whitebox.getInternalState(server.getState(),
-                "configurationManager");
+        final AtomicBoolean reconf1 = new AtomicBoolean(false);
+        final AtomicBoolean reconf2 = new AtomicBoolean(false);
+        final AtomicReference<RaftPeer[]> finalPeers = new AtomicReference<>(null);
+        final AtomicReference<RaftPeer[]> deadPeers = new AtomicReference<>(null);
+        CountDownLatch latch = new CountDownLatch(1);
+        Thread clientThread = new Thread(() -> {
+          try {
+            PeerChanges c1 = cluster.addNewPeers(2, true);
+            LOG.info("Start changing the configuration: {}",
+                    asList(c1.allPeersInNewConf));
+
+            RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+            reconf1.set(reply.isSuccess());
+
+            PeerChanges c2 = cluster.removePeers(2, true, asList(c1.newPeers));
+            finalPeers.set(c2.allPeersInNewConf);
+            deadPeers.set(c2.removedPeers);
+
+            LOG.info("Start changing the configuration again: {}",
+                    asList(c2.allPeersInNewConf));
+            reply = client.setConfiguration(c2.allPeersInNewConf);
+            reconf2.set(reply.isSuccess());
+
+            latch.countDown();
+          } catch (Exception ignored) {
+            LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
+          }
+        });
+        clientThread.start();
+
+        latch.await();
+        Assert.assertTrue(reconf1.get());
+        Assert.assertTrue(reconf2.get());
+        waitAndCheckNewConf(cluster, finalPeers.get(), 2, null);
+        final RaftPeerId leader2 = RaftTestUtil.waitForLeader(cluster).getId();
+
+        // check configuration manager's internal state
         // each reconf will generate two configurations: (old, new) and (new)
-        // each leader change generates one configuration.
-        // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
-        final int expectedConf = leader2.equals(leaderId)? 6: 7;
-        Assert.assertEquals(server.getId() + ": " + confManager, expectedConf, confManager.numOfConf());
-      });
+        cluster.getServerAliveStream().forEach(server -> {
+          ConfigurationManager confManager =
+                  (ConfigurationManager) Whitebox.getInternalState(server.getState(),
+                          "configurationManager");
+          // each reconf will generate two configurations: (old, new) and (new)
+          // each leader change generates one configuration.
+          // expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
+          final int expectedConf = leader2.equals(leaderId) ? 6 : 7;
+          Assert.assertEquals(server.getId() + ": " + confManager, expectedConf, confManager.numOfConf());
+        });
+      }
   }
 
   @Test
@@ -258,39 +258,39 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
   void runTestBootstrapReconf(CLUSTER cluster) throws Exception {
       RaftTestUtil.waitForLeader(cluster);
       final RaftPeerId leaderId = cluster.getLeader().getId();
-      final RaftClient client = cluster.createClient(leaderId);
-
-      // submit some msgs before reconf
-      for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
-        RaftClientReply reply = client.send(new SimpleMessage("m" + i));
-        Assert.assertTrue(reply.isSuccess());
-      }
+      try (final RaftClient client = cluster.createClient(leaderId)) {
 
-      PeerChanges c1 = cluster.addNewPeers(2, true);
-      LOG.info("Start changing the configuration: {}",
-          asList(c1.allPeersInNewConf));
-      final AtomicReference<Boolean> success = new AtomicReference<>();
-
-      Thread clientThread = new Thread(() -> {
-        try {
-          RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
-          success.set(reply.isSuccess());
-          client.close();
-        } catch (IOException ioe) {
-          LOG.error("FAILED", ioe);
+        // submit some msgs before reconf
+        for (int i = 0; i < STAGING_CATCHUP_GAP * 2; i++) {
+          RaftClientReply reply = client.send(new SimpleMessage("m" + i));
+          Assert.assertTrue(reply.isSuccess());
         }
-      });
-      clientThread.start();
 
-      Thread.sleep(5000);
-      LOG.info(cluster.printServers());
-      assertSuccess(success);
+        PeerChanges c1 = cluster.addNewPeers(2, true);
+        LOG.info("Start changing the configuration: {}",
+                asList(c1.allPeersInNewConf));
+        final AtomicReference<Boolean> success = new AtomicReference<>();
+
+        Thread clientThread = new Thread(() -> {
+          try {
+            RaftClientReply reply = client.setConfiguration(c1.allPeersInNewConf);
+            success.set(reply.isSuccess());
+          } catch (IOException ioe) {
+            LOG.error("FAILED", ioe);
+          }
+        });
+        clientThread.start();
+
+        Thread.sleep(5000);
+        LOG.info(cluster.printServers());
+        assertSuccess(success);
 
-      final RaftLog leaderLog = cluster.getLeader().getState().getLog();
-      for (RaftPeer newPeer : c1.newPeers) {
-        Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
-            cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
-                .getEntries(0, Long.MAX_VALUE));
+        final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+        for (RaftPeer newPeer : c1.newPeers) {
+          Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
+                  cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
+                          .getEntries(0, Long.MAX_VALUE));
+        }
       }
   }
 
@@ -577,26 +577,29 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
       AtomicBoolean success = new AtomicBoolean(false);
       final RaftPeerId leaderId = cluster.getPeers().iterator().next().getId();
       new Thread(() -> {
-        final RaftClient client = cluster.createClient(leaderId);
-        final RaftClientRpc sender = client.getClientRpc();
-        final RaftClientRequest request = cluster.newRaftClientRequest(
-            client.getId(), leaderId, new SimpleMessage("test"));
-        while (!success.get()) {
-          try {
-            final RaftClientReply reply = sender.sendRequest(request);
-            success.set(reply.isSuccess());
-            if (reply.getException() != null && reply.getException() instanceof LeaderNotReadyException) {
-              caughtNotReady.set(true);
-            }
-          } catch (IOException e) {
-            LOG.info("Hit other IOException", e);
-          }
-          if (!success.get()) {
+        try (final RaftClient client = cluster.createClient(leaderId)) {
+          final RaftClientRpc sender = client.getClientRpc();
+          final RaftClientRequest request = cluster.newRaftClientRequest(
+                  client.getId(), leaderId, new SimpleMessage("test"));
+          while (!success.get()) {
             try {
-              Thread.sleep(200);
-            } catch (InterruptedException ignored) {
+              final RaftClientReply reply = sender.sendRequest(request);
+              success.set(reply.isSuccess());
+              if (reply.getException() != null && reply.getException() instanceof LeaderNotReadyException) {
+                caughtNotReady.set(true);
+              }
+            } catch (IOException e) {
+              LOG.info("Hit other IOException", e);
+            }
+            if (!success.get()) {
+              try {
+                Thread.sleep(200);
+              } catch (InterruptedException ignored) {
+              }
             }
           }
+        } catch (Exception ignored) {
+          LOG.warn(ignored.getClass().getSimpleName() + " is ignored", ignored);
         }
       }).start();
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index e94b43d..329c8a3 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -105,39 +105,39 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     cluster.getLeaderAndSendFirstMessage(true);
     long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
 
-    final RaftClient client = cluster.createClient(leaderId);
-    final RaftClientRpc rpc = client.getClientRpc();
-    final long callId = 999;
-    final SimpleMessage message = new SimpleMessage("message");
-    final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
-    RaftClientReply reply = rpc.sendRequest(r);
-    Assert.assertFalse(reply.isSuccess());
-    Assert.assertNotNull(reply.getStateMachineException());
-
-    // retry with the same callId
-    for (int i = 0; i < 5; i++) {
-      reply = rpc.sendRequest(r);
-      Assert.assertEquals(client.getId(), reply.getClientId());
-      Assert.assertEquals(callId, reply.getCallId());
+    try (final RaftClient client = cluster.createClient(leaderId)) {
+      final RaftClientRpc rpc = client.getClientRpc();
+      final long callId = 999;
+      final SimpleMessage message = new SimpleMessage("message");
+      final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
+      RaftClientReply reply = rpc.sendRequest(r);
       Assert.assertFalse(reply.isSuccess());
       Assert.assertNotNull(reply.getStateMachineException());
-    }
 
-    long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
-    // make sure retry cache has the entry
-    for (RaftServerImpl server : cluster.iterateServerImpls()) {
-      LOG.info("check server " + server.getId());
-      if (server.getState().getLastAppliedIndex() < leaderApplied) {
-        Thread.sleep(1000);
+      // retry with the same callId
+      for (int i = 0; i < 5; i++) {
+        reply = rpc.sendRequest(r);
+        Assert.assertEquals(client.getId(), reply.getClientId());
+        Assert.assertEquals(callId, reply.getCallId());
+        Assert.assertFalse(reply.isSuccess());
+        Assert.assertNotNull(reply.getStateMachineException());
       }
-      Assert.assertNotNull(
-          RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
-      final RaftLog log = server.getState().getLog();
-      RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
-    }
 
-    client.close();
-    cluster.shutdown();
+      long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+      // make sure retry cache has the entry
+      for (RaftServerImpl server : cluster.iterateServerImpls()) {
+        LOG.info("check server " + server.getId());
+        if (server.getState().getLastAppliedIndex() < leaderApplied) {
+          Thread.sleep(1000);
+        }
+        Assert.assertNotNull(
+                RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+        final RaftLog log = server.getState().getLog();
+        RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
+      }
+
+      cluster.shutdown();
+    }
   }
 
   @Test
@@ -150,31 +150,31 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     cluster.getLeaderAndSendFirstMessage(true);
     // turn on the preAppend failure switch
     failPreAppend = true;
-    final RaftClient client = cluster.createClient(oldLeader.getId());
-    final RaftClientRpc rpc = client.getClientRpc();
-    final long callId = 999;
-    final SimpleMessage message = new SimpleMessage("message");
-    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), oldLeader.getId(), callId, message);
-    RaftClientReply reply = rpc.sendRequest(r);
-    Objects.requireNonNull(reply.getStateMachineException());
-
-    final RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(oldLeader, client.getId(), callId);
-    Assert.assertNotNull(oldEntry);
-    Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
-
-    // At this point of time the old leader would have stepped down. wait for leader election to complete
-    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
-    // retry
-    r = cluster.newRaftClientRequest(client.getId(), leader.getId(), callId, message);
-    reply = rpc.sendRequest(r);
-    Objects.requireNonNull(reply.getStateMachineException());
-
-    RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
-        leader, client.getId(), callId);
-    Assert.assertNotNull(currentEntry);
-    Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
-    Assert.assertNotEquals(oldEntry, currentEntry);
-    failPreAppend = false;
-    client.close();
+    try (final RaftClient client = cluster.createClient(oldLeader.getId())) {
+      final RaftClientRpc rpc = client.getClientRpc();
+      final long callId = 999;
+      final SimpleMessage message = new SimpleMessage("message");
+      RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), oldLeader.getId(), callId, message);
+      RaftClientReply reply = rpc.sendRequest(r);
+      Objects.requireNonNull(reply.getStateMachineException());
+
+      final RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(oldLeader, client.getId(), callId);
+      Assert.assertNotNull(oldEntry);
+      Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
+
+      // At this point of time the old leader would have stepped down. wait for leader election to complete
+      final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+      // retry
+      r = cluster.newRaftClientRequest(client.getId(), leader.getId(), callId, message);
+      reply = rpc.sendRequest(r);
+      Objects.requireNonNull(reply.getStateMachineException());
+
+      RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
+              leader, client.getId(), callId);
+      Assert.assertNotNull(currentEntry);
+      Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
+      Assert.assertNotEquals(oldEntry, currentEntry);
+      failPreAppend = false;
+    }
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 7858f5a..9c1fe09 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -89,44 +89,43 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
 
     cluster.getLeaderAndSendFirstMessage(true);
 
-    final RaftClient client = cluster.createClient(leaderId);
-    client.send(new RaftTestUtil.SimpleMessage("message"));
-    RaftClientReply reply = client.send(
-        new RaftTestUtil.SimpleMessage("message2"));
-
-    long logIndex = reply.getLogIndex();
-    //Confirm that followers have committed
-    RaftClientReply watchReply = client.sendWatch(
-        logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
-    watchReply.getCommitInfos().forEach(
-        val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
-
-    RaftServerImpl secondFollower = cluster.getFollowers().get(1);
-    // Second follower is blocked in apply transaction
-    Assert.assertTrue(
-        secondFollower.getState().getLastAppliedIndex()
-            < logIndex);
-
-    // Now shutdown the follower in a separate thread
-    Thread t = new Thread(() -> secondFollower.shutdown(true));
-    t.start();
-
-    // The second follower should still be blocked in apply transaction
-    Assert.assertTrue(
-        secondFollower.getState().getLastAppliedIndex()
-            < logIndex);
-
-    // Now unblock the second follower
-    ((StateMachineWithConditionalWait)secondFollower.getStateMachine())
-        .unBlockApplyTxn();
-
-    // Now wait for the thread
-    t.join(5000);
-    Assert.assertEquals(
-        secondFollower.getState().getLastAppliedIndex(),
-        logIndex);
-
-    client.close();
-    cluster.shutdown();
+    try (final RaftClient client = cluster.createClient(leaderId)) {
+      client.send(new RaftTestUtil.SimpleMessage("message"));
+      RaftClientReply reply = client.send(
+              new RaftTestUtil.SimpleMessage("message2"));
+
+      long logIndex = reply.getLogIndex();
+      //Confirm that followers have committed
+      RaftClientReply watchReply = client.sendWatch(
+              logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+      watchReply.getCommitInfos().forEach(
+              val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
+      RaftServerImpl secondFollower = cluster.getFollowers().get(1);
+      // Second follower is blocked in apply transaction
+      Assert.assertTrue(
+              secondFollower.getState().getLastAppliedIndex()
+                      < logIndex);
+
+      // Now shutdown the follower in a separate thread
+      Thread t = new Thread(() -> secondFollower.shutdown(true));
+      t.start();
+
+      // The second follower should still be blocked in apply transaction
+      Assert.assertTrue(
+              secondFollower.getState().getLastAppliedIndex()
+                      < logIndex);
+
+      // Now unblock the second follower
+      ((StateMachineWithConditionalWait) secondFollower.getStateMachine())
+              .unBlockApplyTxn();
+
+      // Now wait for the thread
+      t.join(5000);
+      Assert.assertEquals(
+              secondFollower.getState().getLastAppliedIndex(),
+              logIndex);
+
+      cluster.shutdown();
+    }
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 50c1c13..18dab32 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -62,33 +62,34 @@ public class TestLogAppenderWithGrpc
     cluster.start();
 
     // client and leader setup
-    RaftClient client = cluster.createClient(cluster.getGroup());
-    client.send(new RaftTestUtil.SimpleMessage("m"));
-    RaftServerImpl leader = waitForLeader(cluster);
-    long initialNextIndex = leader.getState().getNextIndex();
-
-    for (RaftServerImpl server : cluster.getFollowers()) {
-      // block the appends in the follower
-      ((SimpleStateMachine4Testing)server.getStateMachine()).blockWriteStateMachineData();
-    }
-    Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
-    for (int i = 0; i < maxAppends * 2; i++) {
-      futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
-    }
+    try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
+      client.send(new RaftTestUtil.SimpleMessage("m"));
+      RaftServerImpl leader = waitForLeader(cluster);
+      long initialNextIndex = leader.getState().getNextIndex();
+
+      for (RaftServerImpl server : cluster.getFollowers()) {
+        // block the appends in the follower
+        ((SimpleStateMachine4Testing) server.getStateMachine()).blockWriteStateMachineData();
+      }
+      Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
+      for (int i = 0; i < maxAppends * 2; i++) {
+        futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage("m")));
+      }
 
-    FIVE_SECONDS.sleep();
-    for (long nextIndex : leader.getFollowerNextIndices()) {
-      // Verify nextIndex does not progress due to pendingRequests limit
-      Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
-    }
-    ONE_SECOND.sleep();
-    for (RaftServerImpl server : cluster.getFollowers()) {
-      // unblock the appends in the follower
-      ((SimpleStateMachine4Testing)server.getStateMachine()).unblockWriteStateMachineData();
-    }
+      FIVE_SECONDS.sleep();
+      for (long nextIndex : leader.getFollowerNextIndices()) {
+        // Verify nextIndex does not progress due to pendingRequests limit
+        Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
+      }
+      ONE_SECOND.sleep();
+      for (RaftServerImpl server : cluster.getFollowers()) {
+        // unblock the appends in the follower
+        ((SimpleStateMachine4Testing) server.getStateMachine()).unblockWriteStateMachineData();
+      }
 
-    JavaUtils.allOf(futures).join();
-    cluster.shutdown();
+      JavaUtils.allOf(futures).join();
+      cluster.shutdown();
+    }
   }
 
   @Test
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index aa1cca8..85d1171 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -66,7 +66,9 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 import java.util.SortedMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -219,37 +221,50 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
     String message = "2nd Message";
     // Block stateMachine flush data, so that 2nd request will not be
     // completed, and so it will not be removed from pending request map.
-    cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()).sendAsync(new SimpleMessage(message));
+    List<RaftClient> clients = new ArrayList<>();
 
-   final SortedMap< String, Gauge > gaugeMap =
-        cluster.getLeader().getRaftServerMetrics().getRegistry()
-            .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+    try {
+      RaftClient client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+      clients.add(client);
+      client.sendAsync(new SimpleMessage(message));
 
-    RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
-        300, 5000);
 
+      final SortedMap<String, Gauge> gaugeMap =
+              cluster.getLeader().getRaftServerMetrics().getRegistry()
+                      .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
 
-    for (int i = 0; i < 10; i++) {
-      cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry()).sendAsync(new SimpleMessage(message));
-    }
+      RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
+              300, 5000);
+
+      for (int i = 0; i < 10; i++) {
+        client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+        clients.add(client);
+        client.sendAsync(new SimpleMessage(message));
+      }
 
-    // Because we have passed 11 requests, and the element queue size is 10.
-    RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
-        .getCount() == 1, 300, 5000);
+      // Because we have passed 11 requests, and the element queue size is 10.
+      RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
+              .getCount() == 1, 300, 5000);
 
-    stateMachine.unblockFlushStateMachineData();
+      stateMachine.unblockFlushStateMachineData();
 
-    // Send a message with 120, our byte size limit is 110, so it should fail
-    // and byte size counter limit will be hit.
+      // Send a message with 120, our byte size limit is 110, so it should fail
+      // and byte size counter limit will be hit.
 
-    cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry())
-        .sendAsync(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+      client = cluster.createClient(cluster.getLeader().getId(), RetryPolicies.noRetry());
+      client.sendAsync(new SimpleMessage(RandomStringUtils.random(120, true, false)));
+      clients.add(client);
 
-    RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
-        .getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
+      RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
+              .getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
 
-    Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
-        .getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
+      Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
+              .getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
+    } finally {
+      for (RaftClient client : clients) {
+        client.close();
+      }
+    }
   }
 
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 3d374a8..e96a418 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -184,13 +184,14 @@ public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.Fact
       builder.setDefaultPolicy(RetryPolicies.retryForeverNoSleep());
 
       // create a client with the exception dependent policy
-      RaftClient client = cluster.createClient(builder.build());
-      client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
+      try (final RaftClient client = cluster.createClient(builder.build())) {
+        client.sendAsync(new RaftTestUtil.SimpleMessage("1")).get();
 
-      leader = cluster.getLeader();
-      ((SimpleStateMachine4Testing)leader.getStateMachine()).blockWriteStateMachineData();
+        leader = cluster.getLeader();
+        ((SimpleStateMachine4Testing) leader.getStateMachine()).blockWriteStateMachineData();
 
-      client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+        client.sendAsync(new RaftTestUtil.SimpleMessage("2")).get();
+      }
       Assert.fail("Test should have failed.");
     } catch (ExecutionException e) {
       RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause();
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index ab277de..9a6c816 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -237,16 +237,14 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
       futures.add(f);
 
       final SimpleMessage m = messages[i];
-      try (final RaftClient client = cluster.createClient()) {
-        new Thread(() -> {
-          try {
-            Assert.assertTrue(client.send(m).isSuccess());
-          } catch (IOException e) {
-            throw new IllegalStateException("Failed to send " + m, e);
-          }
-          f.complete(null);
-        }).start();
-      }
+      new Thread(() -> {
+        try (final RaftClient client = cluster.createClient()) {
+          Assert.assertTrue(client.send(m).isSuccess());
+        } catch (IOException e) {
+          throw new IllegalStateException("Failed to send " + m, e);
+        }
+        f.complete(null);
+      }).start();
     }
     JavaUtils.allOf(futures).get();
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 174ba8c..d0b486a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -185,9 +185,10 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim
       for(RaftGroupId gid : registry.keySet()) {
         final RaftGroup newGroup = RaftGroup.valueOf(gid, cluster.getPeers());
         LOG.info("add new group: " + newGroup);
-        final RaftClient client = cluster.createClient(newGroup);
-        for(RaftPeer p : newGroup.getPeers()) {
-          client.groupAdd(newGroup, p.getId());
+        try (final RaftClient client = cluster.createClient(newGroup)) {
+          for (RaftPeer p : newGroup.getPeers()) {
+            client.groupAdd(newGroup, p.getId());
+          }
         }
       }