You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/18 04:00:23 UTC

[incubator-ratis] branch master updated: RATIS-1162. Enable testing clusters with Netty data stream and gRPC. (#282)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new dc60590  RATIS-1162. Enable testing clusters with Netty data stream and gRPC. (#282)
dc60590 is described below

commit dc6059064b783686d72a3d2b8bf42bc374db095b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 18 12:00:13 2020 +0800

    RATIS-1162. Enable testing clusters with Netty data stream and gRPC. (#282)
    
    * RATIS-1162. Enable testing clusters with Netty data stream and gRPC.
    
    * Add MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.
---
 .../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 10 ++--
 .../ratis/netty/MiniRaftClusterWithNetty.java      |  8 +--
 .../ratis/datastream/DataStreamBaseTest.java       | 59 +++++++++++++++++-----
 ...treamTests.java => DataStreamClusterTests.java} | 43 ++++------------
 ...usterWithRpcTypeGrpcAndDataStreamTypeNetty.java | 50 ++++++++----------
 ...sterWithRpcTypeNettyAndDataStreamTypeNetty.java | 48 ++++++++----------
 ...ava => TestNettyDataStreamWithGrpcCluster.java} | 11 ++--
 ...va => TestNettyDataStreamWithNettyCluster.java} | 11 ++--
 8 files changed, 116 insertions(+), 124 deletions(-)

diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 0174860..9a2cee3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,12 +30,14 @@ import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.IOException;
 
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and data stream disabled.
+ */
 public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithGrpc> FACTORY
       = new Factory<MiniRaftClusterWithGrpc>() {
     @Override
-    public MiniRaftClusterWithGrpc newCluster(
-        String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithGrpc newCluster(String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
       return new MiniRaftClusterWithGrpc(ids, prop);
     }
@@ -51,7 +53,7 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
   public static final DelayLocalExecutionInjection sendServerRequestInjection =
       new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
 
-  private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
+  protected MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
     super(ids, properties, null);
   }
 
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 657dd11..f5a9b15 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -30,6 +30,9 @@ import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.IOException;
 
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and data stream disabled.
+ */
 public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
   public static final Factory<MiniRaftClusterWithNetty> FACTORY
       = new Factory<MiniRaftClusterWithNetty>() {
@@ -50,7 +53,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
   public static final DelayLocalExecutionInjection sendServerRequest
       = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
 
-  private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
+  protected MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
     super(ids, properties, null);
   }
 
@@ -59,7 +62,6 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
       RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
       RaftProperties properties) throws IOException {
     NettyConfigKeys.Server.setPort(properties, getPort(id, group));
-    NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
     return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index f075048..b206b0c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -63,6 +63,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -88,17 +89,43 @@ abstract class DataStreamBaseTest extends BaseTest {
   private final Executor executor = Executors.newFixedThreadPool(16);
 
   static class MultiDataStreamStateMachine extends BaseStateMachine {
-    final ConcurrentMap<Long, SingleDataStream> streams = new ConcurrentHashMap<>();
+    static class Key {
+      private final ClientId clientId;
+      private final long callId;
+
+      Key(RaftClientRequest request) {
+        this.clientId = request.getClientId();
+        this.callId = request.getCallId();
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        if (this == obj) {
+          return true;
+        } else if (obj == null || getClass() != obj.getClass()) {
+          return false;
+        }
+        final Key that = (Key) obj;
+        return this.callId == that.callId && Objects.equals(this.clientId, that.clientId);
+      }
+
+      @Override
+      public int hashCode() {
+        return Objects.hash(clientId, callId);
+      }
+    }
+
+    private final ConcurrentMap<Key, SingleDataStream> streams = new ConcurrentHashMap<>();
 
     @Override
     public CompletableFuture<DataStream> stream(RaftClientRequest request) {
       final SingleDataStream s = new SingleDataStream(request);
-      streams.put(request.getCallId(), s);
+      streams.put(new Key(request), s);
       return CompletableFuture.completedFuture(s);
     }
 
-    SingleDataStream getSingleDataStream(long callId) {
-      return streams.get(callId);
+    SingleDataStream getSingleDataStream(RaftClientRequest request) {
+      return streams.get(new Key(request));
     }
   }
 
@@ -260,7 +287,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       @Override
       public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
         final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
-        final SingleDataStream stream = stateMachine.getSingleDataStream(request.getCallId());
+        final SingleDataStream stream = stateMachine.getSingleDataStream(request);
         Assert.assertFalse(stream.getWritableByteChannel().isOpen());
         return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
             .setRequest(request)
@@ -449,8 +476,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
-    LOG.info("start stream {}", out.getHeader().getCallId());
+  static int writeAndAssertReplies(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     final List<Integer> sizes = new ArrayList<>();
 
@@ -480,20 +506,25 @@ abstract class DataStreamBaseTest extends BaseTest {
       Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
       Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
     }
+    return dataSize;
+  }
+
+  CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+    LOG.info("start Stream{}", out.getHeader().getCallId());
+    final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
     try {
       for (Server s : servers) {
-        assertHeader(s, out.getHeader(), dataSize);
+        assertHeader(s, out.getHeader(), bytesWritten);
       }
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
 
-    final long byteWritten = dataSize;
-    return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, byteWritten));
+    return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten));
   }
 
-  CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
-      long byteWritten) {
+  static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
+      long bytesWritten) {
     // Test close idempotent
     Assert.assertSame(dataStreamReply, out.closeAsync().join());
     testFailureCase("writeAsync should fail",
@@ -507,7 +538,7 @@ abstract class DataStreamBaseTest extends BaseTest {
       if (reply.isSuccess()) {
         final ByteString bytes = reply.getMessage().getContent();
         if (!bytes.equals(MOCK)) {
-          Assert.assertEquals(bytesWritten2ByteString(byteWritten), bytes);
+          Assert.assertEquals(bytesWritten2ByteString(bytesWritten), bytes);
         }
       }
 
@@ -519,7 +550,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
   void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
     final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
-    final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
+    final SingleDataStream stream = s.getSingleDataStream(header);
     Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
similarity index 67%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 1e6459f..4b9cac2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -17,17 +17,13 @@
  */
 package org.apache.ratis.datastream;
 
-import static org.apache.ratis.RaftTestUtil.waitForLeader;
-
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
-import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -36,21 +32,17 @@ import org.apache.ratis.server.raftlog.RaftLog;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
 
-public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
-    RaftConfigKeys.DataStream.setType(getProperties(), SupportedDataStreamType.NETTY);
     setStateMachine(MultiDataStreamStateMachine.class);
   }
 
   public static final int NUM_SERVERS = 3;
-  // TODO: change bufferSize and bufferNum configurable
-  private static int bufferSize = 1_000_000;
-  private static int bufferNum =  10;
 
   @Test
   public void testStreamWrites() throws Exception {
@@ -66,31 +58,18 @@ public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends
     RaftPeer raftPeer = peers.iterator().next();
 
     try (RaftClient client = cluster.createClient(raftPeer)) {
-      // send header
-      DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
-
-      // send data
-      final int halfBufferSize = bufferSize / 2;
-      int dataSize = 0;
-      for(int i = 0; i < bufferNum; i++) {
-        final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
-
-        final ByteBuffer bf = DataStreamBaseTest.initBuffer(dataSize, size);
-        dataStreamOutputRpc.writeAsync(bf);
-        dataSize += size;
+      // send a stream request
+      final long callId;
+      try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+        DataStreamBaseTest.writeAndAssertReplies(out, 1000, 10);
+        callId = out.getHeader().getCallId();
       }
 
-      // send close
-      dataStreamOutputRpc.closeAsync().join();
-
-      // get request call id
-      long callId = dataStreamOutputRpc.getHeaderFuture().get().getStreamId();
-
       // verify the write request is in the Raft log.
       RaftLog log = leader.getState().getLog();
       boolean transactionFound = false;
       for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
-        RaftProtos.LogEntryProto entryProto = log.get(termIndex.getIndex());
+        final LogEntryProto entryProto = log.get(termIndex.getIndex());
         if (entryProto.hasStateMachineLogEntry()) {
             StateMachineLogEntryProto stateMachineEntryProto = entryProto.getStateMachineLogEntry();
             if (stateMachineEntryProto.getCallId() == callId) {
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
similarity index 50%
copy from ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 0174860..a68b2e1 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,58 +15,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.grpc;
+package org.apache.ratis.datastream;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.server.GrpcService;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.IOException;
 
-public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
-  public static final Factory<MiniRaftClusterWithGrpc> FACTORY
-      = new Factory<MiniRaftClusterWithGrpc>() {
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and {@link SupportedDataStreamType#NETTY}.
+ */
+public class MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRaftClusterWithGrpc {
+  public static final Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> FACTORY
+      = new Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>() {
     @Override
-    public MiniRaftClusterWithGrpc newCluster(
-        String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
-      return new MiniRaftClusterWithGrpc(ids, prop);
+      RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
+      return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, prop);
     }
   };
 
-  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
+  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> {
     @Override
-    default Factory<MiniRaftClusterWithGrpc> getFactory() {
+    default Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> getFactory() {
       return FACTORY;
     }
   }
 
-  public static final DelayLocalExecutionInjection sendServerRequestInjection =
-      new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
-
-  private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
-    super(ids, properties, null);
+  private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, RaftProperties properties) {
+    super(ids, properties);
   }
 
   @Override
-  protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
+  protected RaftServerProxy newRaftServer(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
       RaftProperties properties) throws IOException {
-    GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
-    return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
-  }
-
-  @Override
-  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
-      throws InterruptedException {
-    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
-        leaderId, delayMs, getTimeoutMax());
+    NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
+    return super.newRaftServer(id, stateMachineRegistry, group, properties);
   }
 }
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
similarity index 52%
copy from ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
index 657dd11..1a493e4 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,58 +15,50 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.netty;
+package org.apache.ratis.datastream;
 
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.server.NettyRpcService;
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.statemachine.StateMachine;
 
 import java.io.IOException;
 
-public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
-  public static final Factory<MiniRaftClusterWithNetty> FACTORY
-      = new Factory<MiniRaftClusterWithNetty>() {
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and {@link SupportedDataStreamType#NETTY}.
+ */
+public class MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniRaftClusterWithNetty {
+  public static final Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> FACTORY
+      = new Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>() {
     @Override
-    public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
+    public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) {
       RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
-      return new MiniRaftClusterWithNetty(ids, prop);
+      RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
+      return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, prop);
     }
   };
 
-  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithNetty> {
+  public interface FactoryGet extends Factory.Get<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> {
     @Override
-    default Factory<MiniRaftClusterWithNetty> getFactory() {
+    default Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> getFactory() {
       return FACTORY;
     }
   }
 
-  public static final DelayLocalExecutionInjection sendServerRequest
-      = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
-
-  private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
-    super(ids, properties, null);
+  private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, RaftProperties properties) {
+    super(ids, properties);
   }
 
   @Override
-  protected RaftServerProxy newRaftServer(
-      RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
+  protected RaftServerProxy newRaftServer(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
       RaftProperties properties) throws IOException {
-    NettyConfigKeys.Server.setPort(properties, getPort(id, group));
     NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
-    return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
-  }
-
-  @Override
-  protected void blockQueueAndSetDelay(String leaderId, int delayMs)
-      throws InterruptedException {
-    RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,
-        leaderId, delayMs, getTimeoutMax());
+    return super.newRaftServer(id, stateMachineRegistry, group, properties);
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
similarity index 71%
copy from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
index 8156820..ce5b25d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,10 +17,7 @@
  */
 package org.apache.ratis.datastream;
 
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.netty.MiniRaftClusterWithNetty;
-
-public class TestDataStreamWithNettyMiniRaftCluster extends DataStreamTests<MiniRaftClusterWithNetty>
-    implements MiniRaftClusterWithNetty.FactoryGet {
+public class TestNettyDataStreamWithGrpcCluster
+    extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
+    implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
similarity index 71%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
index 8156820..dd27924 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,10 +17,7 @@
  */
 package org.apache.ratis.datastream;
 
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.netty.MiniRaftClusterWithNetty;
-
-public class TestDataStreamWithNettyMiniRaftCluster extends DataStreamTests<MiniRaftClusterWithNetty>
-    implements MiniRaftClusterWithNetty.FactoryGet {
+public class TestNettyDataStreamWithNettyCluster
+    extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>
+    implements MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.FactoryGet {
 }