You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 23:35:31 UTC

[incubator-ratis] branch master updated: RATIS-1226. The primary server should get the other peers from the current RaftConfiguration. (#343)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e94069  RATIS-1226. The primary server should get the other peers from the current RaftConfiguration. (#343)
6e94069 is described below

commit 6e940698b30b5a6a2e3970b81cbf23c981dd03b9
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 10 07:35:23 2020 +0800

    RATIS-1226. The primary server should get the other peers from the current RaftConfiguration. (#343)
    
    * RATIS-1226. The primary server should get the other peers from the current RaftConfiguration.
    
    * Fix TestNettyDataStreamWithMock.
---
 .../ratis/examples/filestore/cli/Server.java       |  7 -------
 .../org/apache/ratis/grpc/server/GrpcService.java  |  2 +-
 .../apache/ratis/grpc/MiniRaftClusterWithGrpc.java |  4 ++--
 .../ratis/hadooprpc/server/HadoopRpcService.java   |  2 +-
 .../ratis/netty/server/DataStreamManagement.java   | 23 ++++++++++++++++------
 .../apache/ratis/netty/server/NettyRpcService.java |  2 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   | 17 ++++------------
 .../ratis/netty/MiniRaftClusterWithNetty.java      |  4 ++--
 .../java/org/apache/ratis/server/RaftServer.java   |  4 ++++
 .../server/{impl => }/RaftServerRpcWithProxy.java  |  3 +--
 .../ratis/server/impl/DataStreamServerImpl.java    |  4 ++--
 .../apache/ratis/server/impl/RaftServerImpl.java   |  3 +--
 .../apache/ratis/server/impl/RaftServerProxy.java  | 10 +++++++++-
 .../apache/ratis/server/impl/MiniRaftCluster.java  | 12 -----------
 .../ratis/server/impl/RaftServerTestUtil.java      |  9 +++++++++
 .../ratis/datastream/DataStreamBaseTest.java       | 21 +++++++++++++-------
 .../datastream/TestNettyDataStreamWithMock.java    |  4 ----
 17 files changed, 68 insertions(+), 63 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 192d721..ac29d96 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -35,7 +35,6 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
@@ -44,7 +43,6 @@ import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -96,11 +94,6 @@ public class Server extends SubCommandBase {
 
     raftServer.start();
 
-    if (isPrimary(id)) {
-      ((RaftServerProxy) raftServer).getDataStreamServerRpc()
-          .addRaftPeers(getOtherRaftPeers(Arrays.asList(getPeers())));
-    }
-
     for (; raftServer.getLifeCycleState() != LifeCycle.State.CLOSED; ) {
       TimeUnit.SECONDS.sleep(1);
     }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index ce0eef7..0d0f72e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -27,7 +27,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 2c68ad5..4864183 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.Parameters;
@@ -26,7 +25,8 @@ import org.apache.ratis.grpc.server.GrpcService;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.MiniRaftCluster;
 
 /**
  * A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and data stream disabled.
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index b75938e..8552765 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -38,7 +38,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
 import org.apache.ratis.server.protocol.RaftServerProtocol;
 import com.google.protobuf.BlockingService;
 import com.google.protobuf.ByteString;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 0b6804b..73aa303 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.server.RaftServer;
@@ -43,7 +44,7 @@ import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.function.CheckedFunction;
+import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -226,13 +227,23 @@ public class DataStreamManagement {
   }
 
   private StreamInfo newStreamInfo(ByteBuf buf,
-      CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
+      CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
     try {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final boolean isPrimary = server.getId().equals(request.getServerId());
-      return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request),
-          isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
+      final List<DataStreamOutputRpc> outs;
+      if (isPrimary) {
+        final RaftGroupId groupId = request.getRaftGroupId();
+        // get the other peers from the current configuration
+        final List<RaftPeer> others = server.getDivision(groupId).getRaftConf().getCurrentPeers().stream()
+            .filter(p -> !p.getId().equals(server.getId()))
+            .collect(Collectors.toList());
+        outs = getStreams.apply(request, others);
+      } else {
+        outs = Collections.emptyList();
+      }
+      return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), outs);
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
@@ -341,13 +352,13 @@ public class DataStreamManagement {
   }
 
   void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
-      CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
+      CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
-      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getDataStreamOutput));
+      final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
       info = streams.computeIfAbsent(key, id -> supplier.get());
       if (!supplier.isInitialized()) {
         throw new IllegalStateException("Failed to create a new stream for " + request
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 4b0f02f..7c4b1b3 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
 import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
 import org.apache.ratis.thirdparty.io.netty.channel.*;
 import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 9b06e24..82811b7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -58,21 +58,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
 
-  /**
-   * Proxies to other peers.
-   *
-   * Invariant: all the {@link #peers} must exist in the {@link #map}.
-   */
+  /** Proxies to other peers. */
   static class Proxies {
-    private final Set<RaftPeer> peers = new CopyOnWriteArraySet<>();
     private final PeerProxyMap<DataStreamClient> map;
 
     Proxies(PeerProxyMap<DataStreamClient> map) {
@@ -82,14 +75,12 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     void addPeers(Collection<RaftPeer> newPeers) {
       // add to the map first in order to preserve the invariant.
       map.addRaftPeers(newPeers);
-      // must use atomic addAll
-      peers.addAll(newPeers);
     }
 
-    List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
+    List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers) throws IOException {
       final List<DataStreamOutputRpc> outs = new ArrayList<>();
       try {
-        getDataStreamOutput(outs, request);
+        getDataStreamOutput(request, peers, outs);
       } catch (IOException e) {
         outs.forEach(DataStreamOutputRpc::closeAsync);
         throw e;
@@ -97,7 +88,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
+    private void getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers, List<DataStreamOutputRpc> outs)
         throws IOException {
       for (RaftPeer peer : peers) {
         try {
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index d444549..16ccffd 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.netty;
 
-import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.Parameters;
@@ -26,7 +25,8 @@ import org.apache.ratis.netty.server.NettyRpcService;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.MiniRaftCluster;
 
 /**
  * A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and data stream disabled.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 838038a..95bd846 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -126,6 +126,10 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the rpc service. */
   RaftServerRpc getServerRpc();
 
+  /** @return the data stream rpc service. */
+  DataStreamServerRpc getDataStreamServerRpc();
+
+  /** @return the {@link RpcType}. */
   default RpcType getRpcType() {
     return getFactory().getRpcType();
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
similarity index 97%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
rename to ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
index ba10ddb..35ac277 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
@@ -15,11 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
 
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.PeerProxyMap;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 7f2e57c..f1b3854 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -29,12 +29,12 @@ import org.apache.ratis.server.RaftServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class DataStreamServerImpl implements DataStreamServer {
+class DataStreamServerImpl implements DataStreamServer {
   public static final Logger LOG = LoggerFactory.getLogger(DataStreamServerImpl.class);
 
   private final DataStreamServerRpc serverRpc;
 
-  public DataStreamServerImpl(RaftServer server, Parameters parameters) {
+  DataStreamServerImpl(RaftServer server, Parameters parameters) {
     final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(server.getProperties(), LOG::info);
     this.serverRpc = DataStreamServerFactory.newInstance(type, parameters).newDataStreamServerRpc(server);
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6669216..6cccce0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -903,8 +903,7 @@ class RaftServerImpl implements RaftServer.Division,
         return pending.getFuture();
       }
 
-      // add new peers into the rpc service
-      getServerRpc().addRaftPeers(peersInNewConf);
+      getRaftServer().addRaftPeers(peersInNewConf);
       // add staging state into the leaderState
       pending = leaderState.startSetConfiguration(request);
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f285d57..069386f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -54,6 +54,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -242,10 +243,16 @@ public class RaftServerProxy implements RaftServer {
     raftGroup.ifPresent(this::addGroup);
   }
 
+  void addRaftPeers(Collection<RaftPeer> peers) {
+    final List<RaftPeer> others = peers.stream().filter(p -> !p.getId().equals(getId())).collect(Collectors.toList());
+    getServerRpc().addRaftPeers(others);
+    getDataStreamServerRpc().addRaftPeers(others);
+  }
+
   private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
     return CompletableFuture.supplyAsync(() -> {
       try {
-        serverRpc.addRaftPeers(group.getPeers());
+        addRaftPeers(group.getPeers());
         return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
       } catch(IOException e) {
         throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
@@ -307,6 +314,7 @@ public class RaftServerProxy implements RaftServer {
     return serverRpc;
   }
 
+  @Override
   public DataStreamServerRpc getDataStreamServerRpc() {
     return dataStreamServerRpc;
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index acf0735..177f5e4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -280,17 +280,6 @@ public abstract class MiniRaftCluster implements Closeable {
     return this;
   }
 
-  private void initDataStreamServer() {
-    LOG.info("Setting up data stream servers");
-    for (RaftServerProxy serverProxy : servers.values()) {
-      serverProxy.getDataStreamServerRpc().addRaftPeers(getOtherRaftPeers(serverProxy.getId()));
-    }
-  }
-
-  private Collection<RaftPeer> getOtherRaftPeers(RaftPeerId id) {
-    return peers.values().stream().filter(r -> !r.getId().equals(id)).collect(Collectors.toList());
-  }
-
   public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
     final RaftServerProxy s = newRaftServer(id, group, format);
     Preconditions.assertTrue(servers.put(id, s) == null);
@@ -313,7 +302,6 @@ public abstract class MiniRaftCluster implements Closeable {
     LOG.info(".............................................................. ");
 
     initServers();
-    initDataStreamServer();
     startServers(servers.values());
 
     this.timer.updateAndGet(t -> t != null? t
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 55f650b..9fc50f1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServer;
@@ -115,6 +116,10 @@ public class RaftServerTestUtil {
     return (ConfigurationManager) Whitebox.getInternalState(getState(server), "configurationManager");
   }
 
+  public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> peers) {
+    return RaftConfigurationImpl.newBuilder().setConf(peers).build();
+  }
+
   public static RaftConfiguration getRaftConf(RaftServer.Division server) {
     return ((RaftServerImpl)server).getRaftConf();
   }
@@ -149,6 +154,10 @@ public class RaftServerTestUtil {
     return JavaUtils.callAsUnchecked(() -> server.getDivision(groupId));
   }
 
+  public static DataStreamServer newDataStreamServer(RaftServer server) {
+    return new DataStreamServerImpl(server, null);
+  }
+
   public static DataStreamMap newDataStreamMap(Object name) {
     return new DataStreamMapImpl(name);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 6694db5..579a965 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,6 +18,8 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.server.DataStreamServer;
+import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.DivisionProperties;
 import org.apache.ratis.server.RaftServerRpc;
@@ -54,7 +56,6 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.ServerFactory;
@@ -79,7 +80,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 abstract class DataStreamBaseTest extends BaseTest {
-  static class MyDivision implements RaftServer.Division {
+  class MyDivision implements RaftServer.Division {
     private final RaftServer server;
     private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
     private final DataStreamMap streamMap;
@@ -107,7 +108,8 @@ abstract class DataStreamBaseTest extends BaseTest {
 
     @Override
     public RaftConfiguration getRaftConf() {
-      return null;
+      final List<RaftPeer> peers = servers.stream().map(Server::getPeer).collect(Collectors.toList());
+      return RaftServerTestUtil.newRaftConfiguration(peers);
     }
 
     @Override
@@ -161,12 +163,12 @@ abstract class DataStreamBaseTest extends BaseTest {
   static class Server {
     private final RaftPeer peer;
     private final RaftServer raftServer;
-    private final DataStreamServerImpl dataStreamServer;
+    private final DataStreamServer dataStreamServer;
 
     Server(RaftPeer peer, RaftServer raftServer) {
       this.peer = peer;
       this.raftServer = raftServer;
-      this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+      this.dataStreamServer = RaftServerTestUtil.newDataStreamServer(raftServer);
     }
 
     RaftPeer getPeer() {
@@ -203,7 +205,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     return new MyRaftServer(peer, properties);
   }
 
-  static class MyRaftServer implements RaftServer {
+  class MyRaftServer implements RaftServer {
       private final RaftPeer peer;
       private final RaftProperties properties;
       private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
@@ -259,6 +261,11 @@ abstract class DataStreamBaseTest extends BaseTest {
       }
 
       @Override
+      public DataStreamServerRpc getDataStreamServerRpc() {
+        return null;
+      }
+
+      @Override
       public RaftClientReply submitClientRequest(RaftClientRequest request) {
         return submitClientRequestAsync(request).join();
       }
@@ -277,7 +284,7 @@ abstract class DataStreamBaseTest extends BaseTest {
             .thenApply(channel -> buildRaftClientReply(request, channel));
       }
 
-      static RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
+      RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
         Assert.assertTrue(channel instanceof MyDataChannel);
         final MyDataChannel dataChannel = (MyDataChannel) channel;
         return RaftClientReply.newBuilder()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index e3076de..10803fa 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -27,10 +27,8 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftServer;
@@ -45,8 +43,6 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 import static org.mockito.Mockito.mock;