You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/03 10:50:37 UTC

[incubator-ratis] branch master updated: RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private. (#318)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ef95e2c  RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private. (#318)
ef95e2c is described below

commit ef95e2cad987d66cc094f4f5966f2594ab78d04d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 3 18:50:27 2020 +0800

    RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private. (#318)
    
    * RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private.
    
    * Add @SuppressWarnings("parameternumber") for a checkstyle warning
---
 .../java/org/apache/ratis/server/RaftServer.java   |  9 ++++--
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  2 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   | 25 +++++++--------
 .../apache/ratis/server/impl/RaftServerProxy.java  | 34 ++++++++++++--------
 .../org/apache/ratis/server/impl/ServerState.java  |  6 +++-
 .../ratis/server/impl/StateMachineUpdater.java     |  2 +-
 .../server/raftlog/segmented/SegmentedRaftLog.java | 34 ++++++--------------
 .../raftlog/segmented/SegmentedRaftLogWorker.java  | 23 +++++---------
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  1 -
 .../test/java/org/apache/ratis/RaftBasicTests.java |  2 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  | 20 +++++++-----
 .../ratis/server/impl/RaftServerTestUtil.java      | 36 +++++++++++++++++++---
 .../ratis/server/impl/RetryCacheTestUtil.java      |  5 +--
 .../server/impl/StateMachineShutdownTests.java     |  2 +-
 .../segmented/SegmentedRaftLogTestUtils.java       |  1 -
 .../ratis/datastream/DataStreamBaseTest.java       |  3 ++
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 13 ++++++--
 .../raftlog/segmented/TestCacheEviction.java       |  7 ++---
 .../raftlog/segmented/TestSegmentedRaftLog.java    | 33 ++++++++------------
 19 files changed, 143 insertions(+), 115 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 7c7d426..c4b9b0f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -44,8 +44,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
     AdminProtocol, AdminAsynchronousProtocol {
   Logger LOG = LoggerFactory.getLogger(RaftServer.class);
 
-  /** A division of a {@link RaftServer} for a particular group. */
-  interface Division {
+  /** A division of a {@link RaftServer} for a particular {@link RaftGroup}. */
+  interface Division extends Closeable {
     Logger LOG = LoggerFactory.getLogger(Division.class);
 
     /** @return the {@link RaftGroupMemberId} for this division. */
@@ -63,6 +63,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
           .orElseGet(() -> getRaftServer().getPeer());
     }
 
+    /** @return the information about this division. */
     DivisionInfo getInfo();
 
     /** @return the {@link RaftGroup} for this division. */
@@ -80,7 +81,11 @@ public interface RaftServer extends Closeable, RpcType.Get,
     /** @return the data stream map of this division. */
     DataStreamMap getDataStreamMap();
 
+    /** @return the internal {@link RaftClient} of this division. */
     RaftClient getRaftClient();
+
+    @Override
+    void close();
   }
 
   /** @return the server ID. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d50b26f..1543dc3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -773,7 +773,7 @@ class LeaderStateImpl implements LeaderState {
           }
           // the pending request handler will send NotLeaderException for
           // pending client requests when it stops
-          server.shutdown();
+          server.close();
         }
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8206f6e..b90fe57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -83,7 +83,7 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 import com.codahale.metrics.Timer;
 
-public class RaftServerImpl implements RaftServer.Division,
+class RaftServerImpl implements RaftServer.Division,
     RaftServerProtocol, RaftServerAsynchronousProtocol,
     RaftClientProtocol, RaftClientAsynchronousProtocol {
   private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
@@ -270,7 +270,7 @@ public class RaftServerImpl implements RaftServer.Division,
     return proxy;
   }
 
-  public RaftServerRpc getServerRpc() {
+  RaftServerRpc getServerRpc() {
     return proxy.getServerRpc();
   }
 
@@ -327,7 +327,7 @@ public class RaftServerImpl implements RaftServer.Division,
     // do not start FollowerState
   }
 
-  public ServerState getState() {
+  ServerState getState() {
     return state;
   }
 
@@ -365,7 +365,7 @@ public class RaftServerImpl implements RaftServer.Division,
     final RaftStorageDirectory dir = state.getStorage().getStorageDir();
 
     /* Shutdown is triggered here inorder to avoid any locked files. */
-    shutdown();
+    close();
     getStateMachine().event().notifyGroupRemove();
     if (deleteDirectory) {
       for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) {
@@ -398,7 +398,8 @@ public class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  public void shutdown() {
+  @Override
+  public void close() {
     lifeCycle.checkStateAndClose(() -> {
       LOG.info("{}: shutdown", getMemberId());
       try {
@@ -699,7 +700,7 @@ public class RaftServerImpl implements RaftServer.Division,
     return pending.getFuture();
   }
 
-  public void stepDownOnJvmPause() {
+  void stepDownOnJvmPause() {
     if (getInfo().isLeader()) {
       role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
     }
@@ -1282,7 +1283,7 @@ public class RaftServerImpl implements RaftServer.Division,
     return reply;
   }
 
-  public boolean pause() throws IOException {
+  boolean pause() throws IOException {
     // TODO: should pause() be limited on only working for a follower?
 
     // Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
@@ -1299,7 +1300,7 @@ public class RaftServerImpl implements RaftServer.Division,
     return true;
   }
 
-  public boolean resume() throws IOException {
+  boolean resume() throws IOException {
     synchronized (this) {
       if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
         return false;
@@ -1516,7 +1517,7 @@ public class RaftServerImpl implements RaftServer.Division,
     return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, term, lastEntry);
   }
 
-  public void submitUpdateCommitEvent() {
+  void submitUpdateCommitEvent() {
     role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
   }
 
@@ -1605,7 +1606,7 @@ public class RaftServerImpl implements RaftServer.Division,
    *
    * @param logEntry the log entry being truncated
    */
-  public void notifyTruncatedLogEntry(LogEntryProto logEntry) {
+  void notifyTruncatedLogEntry(LogEntryProto logEntry) {
     if (logEntry.hasStateMachineLogEntry()) {
       final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
       final RetryCache.CacheEntry cacheEntry = getRetryCache().get(invocationId);
@@ -1617,11 +1618,11 @@ public class RaftServerImpl implements RaftServer.Division,
     }
   }
 
-  public LeaderElectionMetrics getLeaderElectionMetrics() {
+  LeaderElectionMetrics getLeaderElectionMetrics() {
     return leaderElectionMetrics;
   }
 
-  public RaftServerMetrics getRaftServerMetrics() {
+  RaftServerMetrics getRaftServerMetrics() {
     return raftServerMetrics;
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b18b07b..df702bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-public class RaftServerProxy implements RaftServer {
+class RaftServerProxy implements RaftServer {
   /**
    * A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
    *
@@ -110,8 +110,22 @@ public class RaftServerProxy implements RaftServer {
         return;
       }
       isClosed = true;
-      map.values().parallelStream().map(CompletableFuture::join)
-          .forEach(RaftServerImpl::shutdown);
+      map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue()));
+    }
+
+    private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
+      final RaftServerImpl impl;
+      try {
+        impl = future.join();
+      } catch (Throwable t) {
+        LOG.warn("{}: Failed to join the division for {}", getId(), groupId, t);
+        return;
+      }
+      try {
+        impl.close();
+      } catch (Throwable t) {
+        LOG.warn("{}: Failed to close the division for {}", getId(), groupId, t);
+      }
     }
 
     synchronized List<RaftGroupId> getGroupIds() {
@@ -292,19 +306,15 @@ public class RaftServerProxy implements RaftServer {
     return properties;
   }
 
-  public RaftServerRpc getServerRpc() {
+  RaftServerRpc getServerRpc() {
     return serverRpc;
   }
 
-  public DataStreamServerRpc getDataStreamServerRpc() {
+  DataStreamServerRpc getDataStreamServerRpc() {
     return dataStreamServerRpc;
   }
 
-  public boolean containsGroup(RaftGroupId groupId) {
-    return impls.containsGroup(groupId);
-  }
-
-  public CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+  private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
     return impls.addNew(group);
   }
 
@@ -316,12 +326,12 @@ public class RaftServerProxy implements RaftServer {
     return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId()));
   }
 
-  public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
+  private RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
     Objects.requireNonNull(groupId, "groupId == null");
     return IOUtils.getFromFuture(getImplFuture(groupId), this::getId);
   }
 
-  public List<RaftServerImpl> getImpls() throws IOException {
+  List<RaftServerImpl> getImpls() throws IOException {
     final List<RaftServerImpl> list = new ArrayList<>();
     for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
       list.add(IOUtils.getFromFuture(f, this::getId));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6dd551b..a40e0d1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -187,7 +187,11 @@ class ServerState implements Closeable {
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
       log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
     } else {
-      log = new SegmentedRaftLog(memberId, server, storage, lastIndexInSnapshot, prop);
+      log = new SegmentedRaftLog(memberId, server,
+          server.getStateMachine(),
+          server::notifyTruncatedLogEntry,
+          server::submitUpdateCommitEvent,
+          storage, lastIndexInSnapshot, prop);
     }
     log.open(lastIndexInSnapshot, logConsumer);
     return log;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 367c777..671f736 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -186,7 +186,7 @@ class StateMachineUpdater implements Runnable {
         } else {
           state = State.EXCEPTION;
           LOG.error(this + " caught a Throwable.", t);
-          server.shutdown();
+          server.close();
         }
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index dbdbced..1aa9616 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server.raftlog.segmented;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -125,14 +125,10 @@ public class SegmentedRaftLog extends RaftLog {
     }
   }
 
-  /** The methods defined in {@link RaftServerImpl} which are used in {@link SegmentedRaftLog}. */
+  /** The server methods used in {@link SegmentedRaftLog}. */
   interface ServerLogMethods {
     ServerLogMethods DUMMY = new ServerLogMethods() {};
 
-    default boolean shouldEvictCache() {
-      return false;
-    }
-
     default long[] getFollowerNextIndices() {
       return null;
     }
@@ -150,18 +146,14 @@ public class SegmentedRaftLog extends RaftLog {
    * When the server is null, return the dummy instance of {@link ServerLogMethods}.
    * Otherwise, the server is non-null, return the implementation using the given server.
    */
-  private ServerLogMethods newServerLogMethods(RaftServerImpl impl) {
+  private ServerLogMethods newServerLogMethods(RaftServer.Division impl,
+      Consumer<LogEntryProto> notifyTruncatedLogEntry) {
     if (impl == null) {
       return ServerLogMethods.DUMMY;
     }
 
     return new ServerLogMethods() {
       @Override
-      public boolean shouldEvictCache() {
-        return cache.shouldEvict();
-      }
-
-      @Override
       public long[] getFollowerNextIndices() {
         return impl.getInfo().getFollowerNextIndices();
       }
@@ -175,7 +167,7 @@ public class SegmentedRaftLog extends RaftLog {
       public void notifyTruncatedLogEntry(TermIndex ti) {
         try {
           final LogEntryProto entry = get(ti.getIndex());
-          impl.notifyTruncatedLogEntry(entry);
+          notifyTruncatedLogEntry.accept(entry);
         } catch (RaftLogIOException e) {
           LOG.error("{}: Failed to read log {}", getName(), ti, e);
         }
@@ -191,18 +183,12 @@ public class SegmentedRaftLog extends RaftLog {
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
 
-  public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
-      RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
-    this(memberId, server, server != null? server.getStateMachine(): null,
-        server != null? server::submitUpdateCommitEvent: null,
-        storage, lastIndexInSnapshot, properties);
-  }
-
-  SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
-      StateMachine stateMachine, Runnable submitUpdateCommitEvent,
+  @SuppressWarnings("parameternumber")
+  public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server,
+      StateMachine stateMachine, Consumer<LogEntryProto> notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
     super(memberId, lastIndexInSnapshot, properties);
-    this.server = newServerLogMethods(server);
+    this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
     this.storage = storage;
     this.stateMachine = stateMachine;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -317,7 +303,7 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   private void checkAndEvictCache() {
-    if (server.shouldEvictCache()) {
+    if (cache.shouldEvict()) {
       // TODO if the cache is hitting the maximum size and we cannot evict any
       // segment's cache, should block the new entry appending or new segment
       // allocation.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 63dd439..d7f7d35 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -24,8 +24,8 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -58,7 +58,7 @@ import java.util.function.Supplier;
  * This class takes the responsibility of all the raft log related I/O ops for a
  * raft peer.
  */
-class SegmentedRaftLogWorker implements Runnable {
+class SegmentedRaftLogWorker {
   static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
 
   static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
@@ -169,13 +169,13 @@ class SegmentedRaftLogWorker implements Runnable {
 
   private final long segmentMaxSize;
   private final long preallocatedSize;
-  private final RaftServerImpl server;
+  private final RaftServer.Division server;
   private int flushBatchSize;
 
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
   SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
-                         RaftServerImpl server, RaftStorage storage, RaftProperties properties,
+                         RaftServer.Division server, RaftStorage storage, RaftProperties properties,
                          RaftLogMetrics metricRegistry) {
     this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
     LOG.info("new {} for {}", name, storage);
@@ -197,7 +197,7 @@ class SegmentedRaftLogWorker implements Runnable {
 
     this.stateMachineDataPolicy = new StateMachineDataPolicy(properties);
 
-    this.workerThread = new Thread(this, name);
+    this.workerThread = new Thread(this::run, name);
 
     // Server Id can be null in unit tests
     metricRegistry.addDataQueueSizeGauge(queue);
@@ -272,9 +272,7 @@ class SegmentedRaftLogWorker implements Runnable {
             + ". The SegmentedRaftLogWorker already stopped.");
       } else {
         LOG.error("Failed to add IO task {}", task, e);
-        if (server != null) {
-          server.shutdown();
-        }
+        Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
       }
     }
     return task;
@@ -284,9 +282,7 @@ class SegmentedRaftLogWorker implements Runnable {
     return running && workerThread.isAlive();
   }
 
-  @Override
-  public void run() {
-
+  private void run() {
     // if and when a log task encounters an exception
     RaftLogIOException logIOException = null;
 
@@ -337,10 +333,7 @@ class SegmentedRaftLogWorker implements Runnable {
               Thread.currentThread().getName(), e);
         } else {
           LOG.error("{} hit exception", Thread.currentThread().getName(), e);
-          // Shutdown raft group instead of terminating jvm.
-          if (server != null) {
-            server.shutdown();
-          }
+          Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
         }
       }
     }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fb891a0..f614a9f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -39,7 +39,6 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
 import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index e473bc9..2fe6291 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -434,7 +434,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
       // Create an entry corresponding to the callId and clientId
       // in each server's retry cache.
       cluster.getServerAliveStream().forEach(
-          raftServer -> RetryCacheTestUtil.getOrCreateEntry(raftServer.getRetryCache(), invocationId));
+          raftServer -> RetryCacheTestUtil.getOrCreateEntry(raftServer, invocationId));
       // Client request for the callId now waits
       // as there is already a cache entry in the server for the request.
       // Ideally the client request should timeout and the client should retry.
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 8b85dc5..3a52585 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -331,9 +331,9 @@ public abstract class MiniRaftCluster implements Closeable {
     killServer(serverId);
     servers.remove(serverId);
 
-    final RaftServerProxy proxy = putNewServer(serverId, group, format);
+    final RaftServer proxy = putNewServer(serverId, group, format);
     proxy.start();
-    return group == null? null: proxy.getImpl(group.getGroupId());
+    return group == null? null: proxy.getDivision(group.getGroupId());
   }
 
   public void restart(boolean format) throws IOException {
@@ -571,7 +571,7 @@ public abstract class MiniRaftCluster implements Closeable {
    *         from the given group.
    */
   private List<RaftServer.Division> getLeaders(RaftGroupId groupId) {
-    final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId);
+    final Stream<RaftServer.Division> serverAliveStream = getServerAliveStream(groupId);
     final List<RaftServer.Division> leaders = new ArrayList<>();
     serverAliveStream.filter(server -> server.getInfo().isLeader()).forEach(s -> {
       if (leaders.isEmpty()) {
@@ -611,25 +611,25 @@ public abstract class MiniRaftCluster implements Closeable {
 
   private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
     return servers.values().stream()
-        .filter(s -> groupId == null || s.containsGroup(groupId));
+        .filter(s -> groupId == null || s.getGroupIds().contains(groupId));
   }
 
   public Iterable<RaftServer.Division> iterateDivisions() {
     return CollectionUtils.as(getServers(), this::getDivision);
   }
 
-  private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
+  private Stream<RaftServer.Division> getServerStream(RaftGroupId groupId) {
     final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
     return groupId != null?
-        stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getImpl(groupId)))
+        stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getDivision(groupId)))
         : stream.flatMap(s -> JavaUtils.callAsUnchecked(s::getImpls).stream());
   }
 
-  public Stream<RaftServerImpl> getServerAliveStream() {
+  public Stream<RaftServer.Division> getServerAliveStream() {
     return getServerAliveStream(getGroupId());
   }
 
-  private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
+  private Stream<RaftServer.Division> getServerAliveStream(RaftGroupId groupId) {
     return getServerStream(groupId).filter(server -> server.getInfo().isAlive());
   }
 
@@ -641,6 +641,10 @@ public abstract class MiniRaftCluster implements Closeable {
     return servers.get(id);
   }
 
+  public ServerFactory getServerFactory(RaftPeerId id) {
+    return servers.get(id).getFactory();
+  }
+
   public RaftServer.Division getDivision(RaftPeerId id) {
     return getDivision(servers.get(id));
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 0897c80..cf9307c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -18,20 +18,26 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.log4j.Level;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Log4jUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
+import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,6 +47,11 @@ import java.util.Collection;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 public class RaftServerTestUtil {
   static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
 
@@ -159,13 +170,30 @@ public class RaftServerTestUtil {
     return new DataStreamMapImpl(name);
   }
 
-  public static void shutdown(RaftServer.Division server) {
-    ((RaftServerImpl)server).shutdown();
-  }
-
   public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
     final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
     Assert.assertNotNull(f);
     Assert.assertTrue(f.lostMajorityHeartbeatsRecently());
   }
+
+  public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, DivisionInfo info,
+      RaftStorage storage, RaftProperties properties) {
+    final RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
+    Mockito.when(server.getInfo()).thenReturn(info);
+
+    return new SegmentedRaftLog(memberId, server, null,
+        server::notifyTruncatedLogEntry,
+        server::submitUpdateCommitEvent,
+        storage, -1, properties);
+  }
+
+  public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, RetryCache retryCache,
+      RaftStorage storage, RaftProperties properties) {
+    final RaftServerImpl server = mock(RaftServerImpl.class);
+    when(server.getRetryCache()).thenReturn(retryCache);
+    when(server.getMemberId()).thenReturn(memberId);
+    doCallRealMethod().when(server).notifyTruncatedLogEntry(any(RaftProtos.LogEntryProto.class));
+    return new SegmentedRaftLog(memberId, server, null,
+        server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent, storage, -1, properties);
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index 2e6875f..70308ef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.util.TimeDuration;
 import org.junit.Assert;
 
@@ -43,7 +44,7 @@ public class RetryCacheTestUtil {
     }
   }
 
-  public static void getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
-    cache.getOrCreateEntry(invocationId);
+  public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
+    ((RaftServerImpl)server).getRetryCache().getOrCreateEntry(invocationId);
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 4775e07..9692eef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -104,7 +104,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
       Assert.assertTrue(secondFollower.getInfo().getLastAppliedIndex() < logIndex);
 
       // Now shutdown the follower in a separate thread
-      final Thread t = new Thread(() -> RaftServerTestUtil.shutdown(secondFollower));
+      final Thread t = new Thread(secondFollower::close);
       t.start();
 
       // The second follower should still be blocked in apply transaction
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index 98f3a77..04527e7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import org.apache.log4j.Level;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.Log4jUtils;
 
 public interface SegmentedRaftLogTestUtils {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 50068d6..b91fb77 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -128,6 +128,9 @@ abstract class DataStreamBaseTest extends BaseTest {
     public RaftClient getRaftClient() {
       return this.client;
     }
+
+    @Override
+    public void close() {}
   }
 
   static class Server {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index d88980e..e652e86 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -30,6 +30,7 @@ import com.codahale.metrics.Gauge;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
@@ -115,10 +116,16 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
     // the rpc server on failure.
     RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId)));
     testFailureCase("start a new server with the same address",
-        () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null).start(),
+        () -> startNewServer(leaderId, cluster.getGroup(), stateMachine, p),
         IOException.class, OverlappingFileLockException.class);
     // Try to start a raft server rpc at the leader address.
-    cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+    cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
+  }
+
+  static void startNewServer(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties)
+      throws Exception {
+    final RaftServer d = ServerImplUtils.newRaftServer(id, group, gid -> stateMachine, properties, null);
+    d.start();
   }
 
   @Test
@@ -128,7 +135,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
 
   void runTestUnsupportedMethods(MiniRaftClusterWithGrpc cluster) throws Exception {
     final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
-    final RaftServerRpc rpc = cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+    final RaftServerRpc rpc = cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
 
     testFailureCase("appendEntries",
         () -> rpc.appendEntries(null),
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 3e545b4..22fabc0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -28,7 +28,7 @@ import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
@@ -165,13 +165,10 @@ public class TestCacheEviction extends BaseTest {
     RaftServerConfigKeys.setStorageDir(prop,  Collections.singletonList(storageDir));
     RaftStorage storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR);
 
-    RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
     final DivisionInfo info = Mockito.mock(DivisionInfo.class);
     Mockito.when(info.getLastAppliedIndex()).thenReturn(0L);
     Mockito.when(info.getFollowerNextIndices()).thenReturn(new long[]{});
-    Mockito.when(server.getInfo()).thenReturn(info);
-
-    SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, prop);
+    final SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, info, storage, prop);
     raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
     List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0);
     LogEntryProto[] entries = generateEntries(slist);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index ce81bb9..e2cd6acc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -28,9 +28,9 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.impl.RetryCache;
-import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -64,12 +64,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 import com.codahale.metrics.Timer;
 
 public class TestSegmentedRaftLog extends BaseTest {
@@ -109,7 +103,11 @@ public class TestSegmentedRaftLog extends BaseTest {
   private int bufferSize;
 
   SegmentedRaftLog newSegmentedRaftLog() {
-    return new SegmentedRaftLog(memberId, null, storage, -1, properties);
+    return newSegmentedRaftLog(storage, properties);
+  }
+
+  static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, RaftProperties properties) {
+    return new SegmentedRaftLog(memberId, null, null, null, null, storage, -1, properties);
   }
 
   @Before
@@ -430,7 +428,7 @@ public class TestSegmentedRaftLog extends BaseTest {
 
     final RaftProperties p = new RaftProperties();
     RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, storage, -1, p)) {
+    try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
       final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
@@ -449,13 +447,8 @@ public class TestSegmentedRaftLog extends BaseTest {
     List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
     List<LogEntryProto> entries = prepareLogEntries(ranges, null);
 
-    RaftServerImpl server = mock(RaftServerImpl.class);
     RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
-    when(server.getRetryCache()).thenReturn(retryCache);
-    final RaftGroupMemberId id = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
-    when(server.getMemberId()).thenReturn(id);
-    doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
       // append entries to the raftlog
@@ -470,7 +463,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     List<LogEntryProto> newEntries = prepareLogEntries(
         Arrays.asList(r1, r2, r3), null);
 
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       LOG.info("newEntries[0] = {}", newEntries.get(0));
       final int last = newEntries.size() - 1;
@@ -487,7 +480,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     }
 
     // load the raftlog again and check
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       checkEntries(raftLog, entries, 0, 650);
       checkEntries(raftLog, newEntries, 100, 100);
@@ -507,7 +500,7 @@ public class TestSegmentedRaftLog extends BaseTest {
     final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>());
 
     final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
 
       int next = 0;
@@ -571,10 +564,8 @@ public class TestSegmentedRaftLog extends BaseTest {
       }
     };
 
-    RaftServerImpl server = mock(RaftServerImpl.class);
-    doNothing().when(server).shutdown();
     Throwable ex = null; // TimeoutIOException
-    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, sm, null, storage, -1, properties)) {
+    try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, -1, properties)) {
       raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
       // SegmentedRaftLogWorker should catch TimeoutIOException
       CompletableFuture<Long> f = raftLog.appendEntry(entry);