You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/11 01:21:01 UTC

[incubator-ratis] branch master updated: RATIS-1145. In NettyServerStreamRpc, the local/remote writes should only wait for the previous write. (#267)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new d289643  RATIS-1145. In NettyServerStreamRpc, the local/remote writes should only wait for the previous write. (#267)
d289643 is described below

commit d2896437e8a28d89de92618ca4041fb5e89ab674
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 11 09:18:12 2020 +0800

    RATIS-1145. In NettyServerStreamRpc, the local/remote writes should only wait for the previous write. (#267)
---
 .../ratis/netty/server/NettyServerStreamRpc.java   | 214 ++++++++++++---------
 .../apache/ratis/server/RaftServerConfigKeys.java  |   2 +-
 .../ratis/datastream/DataStreamBaseTest.java       |  64 +++---
 .../ratis/datastream/TestDataStreamNetty.java      |   6 +-
 4 files changed, 166 insertions(+), 120 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index dff6a87..c8528e5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -32,7 +32,6 @@ import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -68,11 +67,12 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 public class NettyServerStreamRpc implements DataStreamServerRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -124,33 +124,92 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     }
   }
 
+  static class LocalStream {
+    private final CompletableFuture<DataStream> streamFuture;
+    private final AtomicReference<CompletableFuture<Long>> writeFuture;
+
+    LocalStream(CompletableFuture<DataStream> streamFuture) {
+      this.streamFuture = streamFuture;
+      this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
+    }
+
+    CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
+      return composeAsync(writeFuture, executor,
+          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream), executor));
+    }
+
+    CompletableFuture<Long> close(Executor executor) {
+      return composeAsync(writeFuture, executor,
+          n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
+    }
+  }
+
+  static class RemoteStream {
+    private final DataStreamOutputRpc out;
+    private final AtomicReference<CompletableFuture<DataStreamReply>> writeFuture;
+
+    RemoteStream(DataStreamOutputRpc out) {
+      this.out = out;
+      this.writeFuture = new AtomicReference<>(out.getHeaderFuture());
+    }
+
+    CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
+      return composeAsync(writeFuture, executor, v -> out.writeAsync(request.slice().nioBuffer()));
+    }
+
+    CompletableFuture<Boolean> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
+        Executor executor) {
+      return out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      }, executor);
+    }
+
+    CompletableFuture<DataStreamReply> close(Executor executor) {
+      return composeAsync(writeFuture, executor, v -> out.closeAsync());
+    }
+  }
+
   static class StreamInfo {
     private final RaftClientRequest request;
-    private final CompletableFuture<DataStream> stream;
-    private final List<DataStreamOutputRpc> outs;
-    private final AtomicReference<CompletableFuture<?>> previous
+    private final boolean primary;
+    private final LocalStream local;
+    private final List<RemoteStream> remotes;
+    private final AtomicReference<CompletableFuture<Void>> previous
         = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
-    StreamInfo(RaftClientRequest request, CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
+    StreamInfo(RaftClientRequest request, boolean primary,
+        CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
       this.request = request;
-      this.stream = stream;
-      this.outs = outs;
+      this.primary = primary;
+      this.local = new LocalStream(stream);
+      this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
     }
 
-    CompletableFuture<DataStream> getStream() {
-      return stream;
+    AtomicReference<CompletableFuture<Void>> getPrevious() {
+      return previous;
     }
 
-    List<DataStreamOutputRpc> getDataStreamOutputs() {
-      return outs;
+    RaftClientRequest getRequest() {
+      return request;
     }
 
-    AtomicReference<CompletableFuture<?>> getPrevious() {
-      return previous;
+    boolean isPrimary() {
+      return primary;
     }
 
-    RaftClientRequest getRequest() {
-      return request;
+    LocalStream getLocal() {
+      return local;
+    }
+
+    <T> List<T> applyToRemotes(Function<RemoteStream, T> function) {
+      return remotes.isEmpty()?Collections.emptyList(): remotes.stream().map(function).collect(Collectors.toList());
     }
 
     @Override
@@ -213,10 +272,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
   private final ChannelFuture channelFuture;
 
   private final StreamMap streams = new StreamMap();
-
   private final Proxies proxies;
 
-  private final ExecutorService executorService;
+  private final Executor executor;
 
   public NettyServerStreamRpc(RaftServer server) {
     this.server = server;
@@ -232,7 +290,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         .childOption(ChannelOption.SO_KEEPALIVE, true)
         .bind(port);
     this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
-    this.executorService = Executors.newFixedThreadPool(
+    this.executor = Executors.newFixedThreadPool(
         RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
   }
 
@@ -254,19 +312,22 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
-      return new StreamInfo(request, stateMachine.data().stream(request),
-          isPrimary(request.getServerId())?
-          proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
+      final boolean isPrimary = server.getId().equals(request.getServerId());
+      return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
+          isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
   }
 
-  private long writeTo(ByteBuf buf, DataStream stream) {
-    if (stream == null) {
-      return 0;
-    }
+  static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
+      Function<T, CompletableFuture<T>> function) {
+    final CompletableFuture<T> composed = future.get().thenComposeAsync(function, executor);
+    future.set(composed);
+    return composed;
+  }
 
+  static long writeTo(ByteBuf buf, DataStream stream) {
     final WritableByteChannel channel = stream.getWritableByteChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -279,17 +340,25 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     return byteWritten;
   }
 
+  static long close(DataStream stream) {
+    try {
+      stream.getWritableByteChannel().close();
+      return 0L;
+    } catch (IOException e) {
+      throw new CompletionException("Failed to close " + stream, e);
+    }
+  }
+
   private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
         request.getStreamId(), request.getStreamOffset(), null, -1, false, request.getType());
     ctx.writeAndFlush(reply);
   }
 
-  private void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
+  static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
       ChannelHandlerContext ctx) {
     final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
         request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
-    LOG.debug("{}: write {}", this, reply);
     ctx.writeAndFlush(reply);
   }
 
@@ -311,9 +380,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     };
   }
 
-  private int startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+  private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
+      ChannelHandlerContext ctx) {
     try {
-      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+      return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
         if (reply.isSuccess()) {
           ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
           sendReplySuccess(request, buffer, -1, ctx);
@@ -326,30 +396,17 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
-      }, executorService);
+      }, executor);
     } catch (IOException e) {
       sendReplyNotSuccess(request, ctx);
+      return CompletableFuture.completedFuture(null);
     }
-    return 0;
   }
 
   private void forwardStartTransaction(
       final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
-    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
-    for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
-      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
-        if (reply.isSuccess()) {
-          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
-              ((DataStreamReplyByteBuffer)reply).slice(): null;
-          sendReplySuccess(request, buffer, -1, ctx);
-          return true;
-        } else {
-          return false;
-        }
-      }, executorService);
-
-      results.add(f);
-    }
+    final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+        out -> out.startTransaction(request, ctx, executor));
 
     JavaUtils.allOf(results).thenAccept(v -> {
       if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
@@ -358,68 +415,46 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
     });
   }
 
-  private boolean isPrimary(RaftPeerId primaryId) {
-    return server.getId().equals(primaryId);
-  }
-
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
+    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+
+    if (request.getType() == Type.START_TRANSACTION) {
+      // for peers to start transaction
+      final StreamInfo info = streams.get(key);
+      composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
+          .thenAccept(v -> buf.release());
+      return;
+    }
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
-    final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
-    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+    final List<CompletableFuture<DataStreamReply>> remoteWrites;
     if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
-      for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
-        remoteWrites.add(out.getHeaderFuture());
-      }
+      remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
-      final CompletableFuture<?> previous = info.getPrevious().get();
-
-      localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);
-      for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
-        remoteWrites.add(previous.thenComposeAsync(v -> out.writeAsync(request.slice().nioBuffer()), executorService));
-      }
+      localWrite = info.getLocal().write(buf, executor);
+      remoteWrites = info.applyToRemotes(out -> out.write(request, executor));
     } else if (request.getType() == Type.STREAM_CLOSE) {
       info = streams.get(key);
-      final CompletableFuture<?> previous = info.getPrevious().get();
-
-      localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> {
-        try {
-          stream.getWritableByteChannel().close();
-          return 0L;
-        } catch (IOException e) {
-          throw new CompletionException("Failed to close " + stream, e);
-        }
-      }, executorService);
-
-      if (isPrimary(info.getRequest().getServerId())) {
-        for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
-          remoteWrites.add(previous.thenComposeAsync(v -> out.closeAsync(), executorService));
-        }
-      }
-    } else if (request.getType() == Type.START_TRANSACTION) {
-      // peer server start transaction
-      info = streams.get(key);
-      final CompletableFuture<?> previous = info.getPrevious().get();
-      previous.thenApplyAsync(v -> startTransaction(streams.get(key), request, ctx), executorService);
-      return;
+      localWrite = info.getLocal().close(executor);
+      remoteWrites = info.isPrimary()? info.applyToRemotes(out -> out.close(executor)): Collections.emptyList();
     } else {
+      buf.release();
       throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
     }
 
-    final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
+    composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
-          buf.release();
           if (request.getType() == Type.STREAM_HEADER
               || request.getType() == Type.STREAM_DATA) {
             sendReply(remoteWrites, request, bytesWritten, ctx);
           } else if (request.getType() == Type.STREAM_CLOSE) {
-            if (isPrimary(info.getRequest().getServerId())) {
+            if (info.isPrimary()) {
               // after all server close stream, primary server start transaction
               // TODO(runzhiwang): send start transaction to leader directly
               startTransaction(info, request, ctx);
@@ -429,10 +464,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
           } else {
             throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
           }
+          buf.release();
           return null;
-        }, executorService);
-
-    info.getPrevious().set(current);
+        }, executor));
   }
 
   private boolean checkSuccessRemoteWrite(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 5abb03f..472785a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -410,7 +410,7 @@ public interface RaftServerConfigKeys {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
 
     String ASYNC_THREAD_POOL_SIZE_KEY = PREFIX + ".async.thread.pool.size";
-    int ASYNC_THREAD_POOL_SIZE_DEFAULT = 4;
+    int ASYNC_THREAD_POOL_SIZE_DEFAULT = 16;
 
     static int asyncThreadPoolSize(RaftProperties properties) {
       return getInt(properties::getInt, ASYNC_THREAD_POOL_SIZE_KEY, ASYNC_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 010eca8..1b8d261 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -52,7 +52,6 @@ import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
@@ -63,6 +62,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
@@ -73,6 +74,8 @@ abstract class DataStreamBaseTest extends BaseTest {
     return (byte) ('A' + pos%MODULUS);
   }
 
+  private final Executor executor = Executors.newFixedThreadPool(16);
+
   static class MultiDataStreamStateMachine extends BaseStateMachine {
     final ConcurrentMap<Long, SingleDataStream> streams = new ConcurrentHashMap<>();
 
@@ -361,37 +364,42 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
-  protected void runTestDataStream(int numServers, int numClients, int numStreams, int bufferSize, int bufferNum)
-      throws Exception {
+  void runTestDataStream(int numServers) throws Exception {
     try {
       setup(numServers);
-      runTestDataStream(numClients, numStreams, bufferSize, bufferNum);
+      final List<CompletableFuture<Void>> futures = new ArrayList<>();
+      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(5, 10, 1_000_000, 10), executor));
+      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(2, 20, 1_000, 10_000), executor));
+      futures.forEach(CompletableFuture::join);
     } finally {
       shutdown();
     }
   }
 
-  private void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) throws Exception {
+  void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) {
     final List<CompletableFuture<Void>> futures = new ArrayList<>();
-    final List<RaftClient> clients = new ArrayList<>();
-    try {
-      for (int j = 0; j < numClients; j++) {
-        final RaftClient client = newRaftClientForDataStream();
-        clients.add(client);
-        for (int i = 0; i < numStreams; i++) {
-          futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
-              (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum)));
-        }
+    for (int j = 0; j < numClients; j++) {
+      futures.add(CompletableFuture.runAsync(() -> runTestDataStream(numStreams, bufferSize, bufferNum), executor));
+    }
+    Assert.assertEquals(numClients, futures.size());
+    futures.forEach(CompletableFuture::join);
+  }
+
+  void runTestDataStream(int numStreams, int bufferSize, int bufferNum) {
+    final List<CompletableFuture<Void>> futures = new ArrayList<>();
+    try(RaftClient client = newRaftClientForDataStream()) {
+      for (int i = 0; i < numStreams; i++) {
+        futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
+            (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum), executor));
       }
-      Assert.assertEquals(numClients*numStreams, futures.size());
+      Assert.assertEquals(numStreams, futures.size());
       futures.forEach(CompletableFuture::join);
-    } finally {
-      for (int j = 0; j < clients.size(); j++) {
-        clients.get(j).close();
-      }
+    } catch (IOException e) {
+      throw new CompletionException(e);
     }
   }
 
+
   void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
@@ -409,6 +417,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
   private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+    LOG.info("start stream {}", out.getHeader().getCallId());
     final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
     final List<Integer> sizes = new ArrayList<>();
 
@@ -450,14 +459,19 @@ abstract class DataStreamBaseTest extends BaseTest {
   void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
     final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
     final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
-    final RaftClientRequest writeRequest = stream.getWriteRequest();
-    Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
-    Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
     Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
-    Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
-    Assert.assertEquals(writeRequest.getClientId(), header.getClientId());
-    Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
+
+    final RaftClientRequest writeRequest = stream.getWriteRequest();
+    assertRaftClientRequest(header, writeRequest);
+  }
+
+  static void assertRaftClientRequest(RaftClientRequest expected, RaftClientRequest computed) {
+    Assert.assertNotNull(computed);
+    Assert.assertEquals(expected.getClientId(), computed.getClientId());
+    Assert.assertEquals(expected.getServerId(), computed.getServerId());
+    Assert.assertEquals(expected.getRaftGroupId(), computed.getRaftGroupId());
+    Assert.assertEquals(expected.getCallId(), computed.getCallId());
   }
 
   static ByteBuffer initBuffer(int offset, int size) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 3a99815..f9fce8b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -70,14 +70,12 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
 
   @Test
   public void testDataStreamSingleServer() throws Exception {
-    runTestDataStream(1, 5, 10, 1_000_000, 10);
-    runTestDataStream(1, 2, 20, 1_000, 10_000);
+    runTestDataStream(1);
   }
 
   @Test
   public void testDataStreamMultipleServer() throws Exception {
-    runTestDataStream(3, 5, 10, 1_000_000, 10);
-    runTestDataStream(3, 2, 20, 1_000, 10_000);
+    runTestDataStream(3);
   }
 
   private void testCloseStream(int leaderIndex, int numServers) throws Exception {