You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/30 08:59:30 UTC

[incubator-ratis] branch master updated: RATIS-1207. Fix duplicated StreamMap#Key (#385)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 918c7c3  RATIS-1207. Fix duplicated StreamMap#Key (#385)
918c7c3 is described below

commit 918c7c3b1c1593c0e70fef0a960766edb256932f
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 30 16:59:20 2020 +0800

    RATIS-1207. Fix duplicated StreamMap#Key (#385)
---
 .../apache/ratis/client/impl/DataStreamClientImpl.java   |  2 +-
 .../datastream/impl/DataStreamPacketByteBuffer.java      |  7 +++++--
 .../ratis/datastream/impl/DataStreamPacketImpl.java      | 15 ++++++++++++---
 .../ratis/datastream/impl/DataStreamReplyByteBuffer.java | 16 ++++++++++++----
 .../datastream/impl/DataStreamRequestByteBuffer.java     |  2 +-
 .../impl/DataStreamRequestFilePositionCount.java         |  2 +-
 .../java/org/apache/ratis/protocol/DataStreamPacket.java |  2 ++
 .../apache/ratis/protocol/DataStreamPacketHeader.java    |  4 ++--
 .../org/apache/ratis/protocol/DataStreamReplyHeader.java |  8 ++++----
 .../apache/ratis/protocol/DataStreamRequestHeader.java   |  9 +++++----
 .../org/apache/ratis/netty/NettyDataStreamUtils.java     |  2 ++
 .../apache/ratis/netty/server/DataStreamManagement.java  | 16 ++++++++++------
 .../ratis/netty/server/DataStreamRequestByteBuf.java     |  9 ++++++---
 ratis-proto/src/main/proto/Raft.proto                    |  1 +
 .../ratis/datastream/DataStreamAsyncClusterTests.java    |  2 +-
 .../org/apache/ratis/datastream/DataStreamBaseTest.java  |  2 +-
 .../org/apache/ratis/datastream/DataStreamTestUtils.java |  7 ++++---
 17 files changed, 70 insertions(+), 36 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 98aa575..29356f6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -109,7 +109,7 @@ public class DataStreamClientImpl implements DataStreamClient {
 
     private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
       final DataStreamRequestHeader h =
-          new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length, options);
+          new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options);
       return orderedStreamAsync.sendRequest(h, data);
     }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index aa8cddb..b8d7b48 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -18,6 +18,8 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.ClientId;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -28,8 +30,9 @@ public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
 
   private final ByteBuffer buffer;
 
-  protected DataStreamPacketByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
-    super(type, streamId, streamOffset);
+  protected DataStreamPacketByteBuffer(ClientId clientId, Type type, long streamId, long streamOffset,
+      ByteBuffer buffer) {
+    super(clientId, type, streamId, streamOffset);
     this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 1f528e3..6be0300 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.util.JavaUtils;
@@ -27,17 +28,24 @@ import org.apache.ratis.util.JavaUtils;
  * This class is immutable.
  */
 public abstract class DataStreamPacketImpl implements DataStreamPacket {
+  private final ClientId clientId;
   private final Type type;
   private final long streamId;
   private final long streamOffset;
 
-  public DataStreamPacketImpl(Type type, long streamId, long streamOffset) {
+  public DataStreamPacketImpl(ClientId clientId, Type type, long streamId, long streamOffset) {
+    this.clientId = clientId;
     this.type = type;
     this.streamId = streamId;
     this.streamOffset = streamOffset;
   }
 
   @Override
+  public ClientId getClientId() {
+    return clientId;
+  }
+
+  @Override
   public Type getType() {
     return type;
   }
@@ -54,8 +62,9 @@ public abstract class DataStreamPacketImpl implements DataStreamPacket {
 
   @Override
   public String toString() {
-    return JavaUtils.getClassSimpleName(getClass()) + ":"
-        + getType()
+    return JavaUtils.getClassSimpleName(getClass())
+        + ":clientId=" + getClientId()
+        + ",type=" + getType()
         + ",id=" + getStreamId()
         + ",offset=" + getStreamOffset()
         + ",length=" + getDataLength();
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index a681716..3ef6251 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamPacket;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
  */
 public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
   public static final class Builder {
+    private ClientId clientId;
     private Type type;
     private long streamId;
     private long streamOffset;
@@ -41,6 +43,11 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
 
     private Builder() {}
 
+    public Builder setClientId(ClientId clientId) {
+      this.clientId = clientId;
+      return this;
+    }
+
     public Builder setType(Type type) {
       this.type = type;
       return this;
@@ -78,13 +85,14 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
     }
 
     public Builder setDataStreamPacket(DataStreamPacket packet) {
-      return setType(packet.getType())
+      return setClientId(packet.getClientId())
+          .setType(packet.getType())
           .setStreamId(packet.getStreamId())
           .setStreamOffset(packet.getStreamOffset());
     }
 
     public DataStreamReplyByteBuffer build() {
-      return new DataStreamReplyByteBuffer(type, streamId, streamOffset, buffer, success, bytesWritten);
+      return new DataStreamReplyByteBuffer(clientId, type, streamId, streamOffset, buffer, success, bytesWritten);
     }
   }
 
@@ -95,9 +103,9 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
   private final boolean success;
   private final long bytesWritten;
 
-  private DataStreamReplyByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer,
+  private DataStreamReplyByteBuffer(ClientId clientId, Type type, long streamId, long streamOffset, ByteBuffer buffer,
       boolean success, long bytesWritten) {
-    super(type, streamId, streamOffset, buffer);
+    super(clientId, type, streamId, streamOffset, buffer);
 
     this.success = success;
     this.bytesWritten = bytesWritten;
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 3dc9137..1433acb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -33,7 +33,7 @@ public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer impl
   private WriteOption[] options;
 
   public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
-    super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+    super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
     this.options = header.getWriteOptions();
     Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index fd60bf8..cc68a9a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -32,7 +32,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
   private WriteOption[] options;
 
   public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
-    super(header.getType(), header.getStreamId(), header.getStreamOffset());
+    super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset());
     this.options = header.getWriteOptions();
     this.file = file;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 9da90c8..caebb9e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -21,6 +21,8 @@ package org.apache.ratis.protocol;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 
 public interface DataStreamPacket {
+  ClientId getClientId();
+
   Type getType();
 
   long getStreamId();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 18a2b6c..3bd7512 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -28,8 +28,8 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
 
   private final long dataLength;
 
-  public DataStreamPacketHeader(Type type, long streamId, long streamOffset, long dataLength) {
-    super(type, streamId, streamOffset);
+  public DataStreamPacketHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength) {
+    super(clientId, type, streamId, streamOffset);
     this.dataLength = dataLength;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index d02c933..9e3d4d4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -48,8 +48,8 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
       final DataStreamPacketHeaderProto h = header.getPacketHeader();
       if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
         buf.readerIndex(buf.readerIndex() + headerBufLen);
-        return new DataStreamReplyHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
-            header.getBytesWritten(), header.getSuccess());
+        return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+            h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
       } else {
         buf.resetReaderIndex();
         return null;
@@ -64,9 +64,9 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
   private final long bytesWritten;
   private final boolean success;
 
-  public DataStreamReplyHeader(Type type, long streamId, long streamOffset, long dataLength,
+  public DataStreamReplyHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength,
       long bytesWritten, boolean success) {
-    super(type, streamId, streamOffset, dataLength);
+    super(clientId, type, streamId, streamOffset, dataLength);
     this.bytesWritten = bytesWritten;
     this.success = success;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 561f4ba..5affdbe 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -58,8 +58,8 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
           options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
         }
 
-        return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
-            options);
+        return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+            h.getStreamOffset(), h.getDataLength(), options);
       } else {
         buf.resetReaderIndex();
         return null;
@@ -73,8 +73,9 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
 
   private final WriteOption[] options;
 
-  public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength, WriteOption... options) {
-    super(type, streamId, streamOffset, dataLength);
+  public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength,
+      WriteOption... options) {
+    super(clientId, type, streamId, streamOffset, dataLength);
     this.options = options;
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index e6111f5..c35da3e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -45,6 +45,7 @@ public interface NettyDataStreamUtils {
   static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
     DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
         .newBuilder()
+        .setClientId(request.getClientId().toByteString())
         .setStreamId(request.getStreamId())
         .setStreamOffset(request.getStreamOffset())
         .setType(request.getType())
@@ -64,6 +65,7 @@ public interface NettyDataStreamUtils {
   static ByteBuffer getDataStreamReplyHeaderProtoByteBuf(DataStreamReplyByteBuffer reply) {
     DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
         .newBuilder()
+        .setClientId(reply.getClientId().toByteString())
         .setStreamId(reply.getStreamId())
         .setStreamOffset(reply.getStreamOffset())
         .setType(reply.getType())
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 2bde5f0..8ad60e8 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -170,10 +170,12 @@ public class DataStreamManagement {
   static class StreamMap {
     static class Key {
       private final ChannelId channelId;
+      private final ClientId clientId;
       private final long streamId;
 
-      Key(ChannelId channelId, long streamId) {
+      Key(ChannelId channelId, ClientId clientId, long streamId) {
         this.channelId = channelId;
+        this.clientId = clientId;
         this.streamId = streamId;
       }
 
@@ -185,17 +187,19 @@ public class DataStreamManagement {
           return false;
         }
         final Key that = (Key) obj;
-        return this.streamId == that.streamId && Objects.equals(this.channelId, that.channelId);
+        return this.clientId.equals(that.clientId)
+            && this.streamId == that.streamId
+            && Objects.equals(this.channelId, that.channelId);
       }
 
       @Override
       public int hashCode() {
-        return Objects.hash(channelId, streamId);
+        return Objects.hash(channelId, clientId, streamId);
       }
 
       @Override
       public String toString() {
-        return channelId + "-" + streamId;
+        return channelId + "-" + clientId + "-" + streamId;
       }
     }
 
@@ -375,14 +379,14 @@ public class DataStreamManagement {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
-    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getClientId(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
       final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
       info = streams.computeIfAbsent(key, id -> supplier.get());
       if (!supplier.isInitialized()) {
         throw new IllegalStateException("Failed to create a new stream for " + request
-            + " since a stream already exists: " + info);
+            + " since a stream already exists Key: " + key + " StreamInfo:" + info);
       }
     } else if (close) {
       info = Optional.ofNullable(streams.remove(key)).orElseThrow(
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 40cb8e7..e870992 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -20,6 +20,7 @@ package org.apache.ratis.netty.server;
 
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
 import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -35,14 +36,16 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
   private final ByteBuf buf;
   private final WriteOption[] options;
 
-  public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, WriteOption[] options, ByteBuf buf) {
-    super(type, streamId, streamOffset);
+  public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, WriteOption[] options,
+      ByteBuf buf) {
+    super(clientId, type, streamId, streamOffset);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
     this.options = options;
   }
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
-    this(header.getType(), header.getStreamId(), header.getStreamOffset(), header.getWriteOptions(), buf);
+    this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(),
+        header.getWriteOptions(), buf);
   }
 
   @Override
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 822783a..b63b93a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -327,6 +327,7 @@ message DataStreamPacketHeaderProto {
   Type type = 3;
   repeated Option options = 4;
   uint64 dataLength = 5;
+  bytes clientId = 6;
 }
 
 message DataStreamRequestHeaderProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index fc61148..826facb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -149,7 +149,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
         final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
             .stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer));
         futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
-            servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
+            servers, leader, out, bufferSize, bufferNum, primaryClientId, client.getId(), stepDownLeader).join(), executor));
       }
       Assert.assertEquals(numStreams, futures.size());
       return futures.stream()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 382b31e..ac0e9c1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -145,7 +145,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
           CollectionUtils.as(servers, Server::getRaftServer), null, out, bufferSize, bufferNum,
-          getPrimaryClientId(), null, false).join();
+          getPrimaryClientId(), client.getId(), false).join();
       if (expectedException != null) {
         Assert.assertFalse(clientReply.isSuccess());
         Assert.assertTrue(clientReply.getException().getMessage().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 89eed98..14d5fe5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -288,7 +288,7 @@ public interface DataStreamTestUtils {
 
   static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
       Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum,
-      ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
+      ClientId primaryClientId, ClientId clientId, boolean stepDownLeader) {
     LOG.info("start Stream{}", out.getHeader().getCallId());
     final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
     try {
@@ -301,7 +301,7 @@ public interface DataStreamTestUtils {
     LOG.info("Stream{}: bytesWritten={}", out.getHeader().getCallId(), bytesWritten);
 
     return out.closeAsync().thenCompose(
-        reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, cluster, stepDownLeader));
+        reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, clientId, stepDownLeader));
   }
 
   static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader)
@@ -323,9 +323,10 @@ public interface DataStreamTestUtils {
   }
 
   static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
-      long bytesWritten, RaftPeerId leader, ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
+      long bytesWritten, RaftPeerId leader, ClientId primaryClientId, ClientId clientId, boolean stepDownLeader) {
     // Test close idempotent
     Assert.assertSame(dataStreamReply, out.closeAsync().join());
+    Assert.assertEquals(dataStreamReply.getClientId(), clientId);
     BaseTest.testFailureCase("writeAsync should fail",
         () -> out.writeAsync(DataStreamRequestByteBuffer.EMPTY_BYTE_BUFFER).join(),
         CompletionException.class, (Logger) null, AlreadyClosedException.class);