You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/30 08:59:30 UTC
[incubator-ratis] branch master updated: RATIS-1207. Fix duplicated
StreamMap#Key (#385)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 918c7c3 RATIS-1207. Fix duplicated StreamMap#Key (#385)
918c7c3 is described below
commit 918c7c3b1c1593c0e70fef0a960766edb256932f
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 30 16:59:20 2020 +0800
RATIS-1207. Fix duplicated StreamMap#Key (#385)
---
.../apache/ratis/client/impl/DataStreamClientImpl.java | 2 +-
.../datastream/impl/DataStreamPacketByteBuffer.java | 7 +++++--
.../ratis/datastream/impl/DataStreamPacketImpl.java | 15 ++++++++++++---
.../ratis/datastream/impl/DataStreamReplyByteBuffer.java | 16 ++++++++++++----
.../datastream/impl/DataStreamRequestByteBuffer.java | 2 +-
.../impl/DataStreamRequestFilePositionCount.java | 2 +-
.../java/org/apache/ratis/protocol/DataStreamPacket.java | 2 ++
.../apache/ratis/protocol/DataStreamPacketHeader.java | 4 ++--
.../org/apache/ratis/protocol/DataStreamReplyHeader.java | 8 ++++----
.../apache/ratis/protocol/DataStreamRequestHeader.java | 9 +++++----
.../org/apache/ratis/netty/NettyDataStreamUtils.java | 2 ++
.../apache/ratis/netty/server/DataStreamManagement.java | 16 ++++++++++------
.../ratis/netty/server/DataStreamRequestByteBuf.java | 9 ++++++---
ratis-proto/src/main/proto/Raft.proto | 1 +
.../ratis/datastream/DataStreamAsyncClusterTests.java | 2 +-
.../org/apache/ratis/datastream/DataStreamBaseTest.java | 2 +-
.../org/apache/ratis/datastream/DataStreamTestUtils.java | 7 ++++---
17 files changed, 70 insertions(+), 36 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 98aa575..29356f6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -109,7 +109,7 @@ public class DataStreamClientImpl implements DataStreamClient {
private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
final DataStreamRequestHeader h =
- new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length, options);
+ new DataStreamRequestHeader(header.getClientId(), type, header.getCallId(), streamOffset, length, options);
return orderedStreamAsync.sendRequest(h, data);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index aa8cddb..b8d7b48 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -18,6 +18,8 @@
package org.apache.ratis.datastream.impl;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.ClientId;
+
import java.nio.ByteBuffer;
/**
@@ -28,8 +30,9 @@ public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
private final ByteBuffer buffer;
- protected DataStreamPacketByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
- super(type, streamId, streamOffset);
+ protected DataStreamPacketByteBuffer(ClientId clientId, Type type, long streamId, long streamOffset,
+ ByteBuffer buffer) {
+ super(clientId, type, streamId, streamOffset);
this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
index 1f528e3..6be0300 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.util.JavaUtils;
@@ -27,17 +28,24 @@ import org.apache.ratis.util.JavaUtils;
* This class is immutable.
*/
public abstract class DataStreamPacketImpl implements DataStreamPacket {
+ private final ClientId clientId;
private final Type type;
private final long streamId;
private final long streamOffset;
- public DataStreamPacketImpl(Type type, long streamId, long streamOffset) {
+ public DataStreamPacketImpl(ClientId clientId, Type type, long streamId, long streamOffset) {
+ this.clientId = clientId;
this.type = type;
this.streamId = streamId;
this.streamOffset = streamOffset;
}
@Override
+ public ClientId getClientId() {
+ return clientId;
+ }
+
+ @Override
public Type getType() {
return type;
}
@@ -54,8 +62,9 @@ public abstract class DataStreamPacketImpl implements DataStreamPacket {
@Override
public String toString() {
- return JavaUtils.getClassSimpleName(getClass()) + ":"
- + getType()
+ return JavaUtils.getClassSimpleName(getClass())
+ + ":clientId=" + getClientId()
+ + ",type=" + getType()
+ ",id=" + getStreamId()
+ ",offset=" + getStreamOffset()
+ ",length=" + getDataLength();
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
index a681716..3ef6251 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.datastream.impl;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamReplyHeader;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
*/
public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer implements DataStreamReply {
public static final class Builder {
+ private ClientId clientId;
private Type type;
private long streamId;
private long streamOffset;
@@ -41,6 +43,11 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
private Builder() {}
+ public Builder setClientId(ClientId clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
public Builder setType(Type type) {
this.type = type;
return this;
@@ -78,13 +85,14 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
}
public Builder setDataStreamPacket(DataStreamPacket packet) {
- return setType(packet.getType())
+ return setClientId(packet.getClientId())
+ .setType(packet.getType())
.setStreamId(packet.getStreamId())
.setStreamOffset(packet.getStreamOffset());
}
public DataStreamReplyByteBuffer build() {
- return new DataStreamReplyByteBuffer(type, streamId, streamOffset, buffer, success, bytesWritten);
+ return new DataStreamReplyByteBuffer(clientId, type, streamId, streamOffset, buffer, success, bytesWritten);
}
}
@@ -95,9 +103,9 @@ public final class DataStreamReplyByteBuffer extends DataStreamPacketByteBuffer
private final boolean success;
private final long bytesWritten;
- private DataStreamReplyByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer,
+ private DataStreamReplyByteBuffer(ClientId clientId, Type type, long streamId, long streamOffset, ByteBuffer buffer,
boolean success, long bytesWritten) {
- super(type, streamId, streamOffset, buffer);
+ super(clientId, type, streamId, streamOffset, buffer);
this.success = success;
this.bytesWritten = bytesWritten;
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 3dc9137..1433acb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -33,7 +33,7 @@ public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer impl
private WriteOption[] options;
public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
- super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+ super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
this.options = header.getWriteOptions();
Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index fd60bf8..cc68a9a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -32,7 +32,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
private WriteOption[] options;
public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
- super(header.getType(), header.getStreamId(), header.getStreamOffset());
+ super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset());
this.options = header.getWriteOptions();
this.file = file;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
index 9da90c8..caebb9e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacket.java
@@ -21,6 +21,8 @@ package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
public interface DataStreamPacket {
+ ClientId getClientId();
+
Type getType();
long getStreamId();
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index 18a2b6c..3bd7512 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -28,8 +28,8 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
private final long dataLength;
- public DataStreamPacketHeader(Type type, long streamId, long streamOffset, long dataLength) {
- super(type, streamId, streamOffset);
+ public DataStreamPacketHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength) {
+ super(clientId, type, streamId, streamOffset);
this.dataLength = dataLength;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index d02c933..9e3d4d4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -48,8 +48,8 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
final DataStreamPacketHeaderProto h = header.getPacketHeader();
if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
buf.readerIndex(buf.readerIndex() + headerBufLen);
- return new DataStreamReplyHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
- header.getBytesWritten(), header.getSuccess());
+ return new DataStreamReplyHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+ h.getStreamOffset(), h.getDataLength(), header.getBytesWritten(), header.getSuccess());
} else {
buf.resetReaderIndex();
return null;
@@ -64,9 +64,9 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
private final long bytesWritten;
private final boolean success;
- public DataStreamReplyHeader(Type type, long streamId, long streamOffset, long dataLength,
+ public DataStreamReplyHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength,
long bytesWritten, boolean success) {
- super(type, streamId, streamOffset, dataLength);
+ super(clientId, type, streamId, streamOffset, dataLength);
this.bytesWritten = bytesWritten;
this.success = success;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 561f4ba..5affdbe 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -58,8 +58,8 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
}
- return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
- options);
+ return new DataStreamRequestHeader(ClientId.valueOf(h.getClientId()), h.getType(), h.getStreamId(),
+ h.getStreamOffset(), h.getDataLength(), options);
} else {
buf.resetReaderIndex();
return null;
@@ -73,8 +73,9 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
private final WriteOption[] options;
- public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength, WriteOption... options) {
- super(type, streamId, streamOffset, dataLength);
+ public DataStreamRequestHeader(ClientId clientId, Type type, long streamId, long streamOffset, long dataLength,
+ WriteOption... options) {
+ super(clientId, type, streamId, streamOffset, dataLength);
this.options = options;
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index e6111f5..c35da3e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -45,6 +45,7 @@ public interface NettyDataStreamUtils {
static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
.newBuilder()
+ .setClientId(request.getClientId().toByteString())
.setStreamId(request.getStreamId())
.setStreamOffset(request.getStreamOffset())
.setType(request.getType())
@@ -64,6 +65,7 @@ public interface NettyDataStreamUtils {
static ByteBuffer getDataStreamReplyHeaderProtoByteBuf(DataStreamReplyByteBuffer reply) {
DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
.newBuilder()
+ .setClientId(reply.getClientId().toByteString())
.setStreamId(reply.getStreamId())
.setStreamOffset(reply.getStreamOffset())
.setType(reply.getType())
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 2bde5f0..8ad60e8 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -170,10 +170,12 @@ public class DataStreamManagement {
static class StreamMap {
static class Key {
private final ChannelId channelId;
+ private final ClientId clientId;
private final long streamId;
- Key(ChannelId channelId, long streamId) {
+ Key(ChannelId channelId, ClientId clientId, long streamId) {
this.channelId = channelId;
+ this.clientId = clientId;
this.streamId = streamId;
}
@@ -185,17 +187,19 @@ public class DataStreamManagement {
return false;
}
final Key that = (Key) obj;
- return this.streamId == that.streamId && Objects.equals(this.channelId, that.channelId);
+ return this.clientId.equals(that.clientId)
+ && this.streamId == that.streamId
+ && Objects.equals(this.channelId, that.channelId);
}
@Override
public int hashCode() {
- return Objects.hash(channelId, streamId);
+ return Objects.hash(channelId, clientId, streamId);
}
@Override
public String toString() {
- return channelId + "-" + streamId;
+ return channelId + "-" + clientId + "-" + streamId;
}
}
@@ -375,14 +379,14 @@ public class DataStreamManagement {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
- final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+ final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
throw new IllegalStateException("Failed to create a new stream for " + request
- + " since a stream already exists: " + info);
+ + " since a stream already exists Key: " + key + " StreamInfo:" + info);
}
} else if (close) {
info = Optional.ofNullable(streams.remove(key)).orElseThrow(
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 40cb8e7..e870992 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -20,6 +20,7 @@ package org.apache.ratis.netty.server;
import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
import org.apache.ratis.io.WriteOption;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -35,14 +36,16 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
private final ByteBuf buf;
private final WriteOption[] options;
- public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, WriteOption[] options, ByteBuf buf) {
- super(type, streamId, streamOffset);
+ public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset, WriteOption[] options,
+ ByteBuf buf) {
+ super(clientId, type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
this.options = options;
}
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
- this(header.getType(), header.getStreamId(), header.getStreamOffset(), header.getWriteOptions(), buf);
+ this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(),
+ header.getWriteOptions(), buf);
}
@Override
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 822783a..b63b93a 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -327,6 +327,7 @@ message DataStreamPacketHeaderProto {
Type type = 3;
repeated Option options = 4;
uint64 dataLength = 5;
+ bytes clientId = 6;
}
message DataStreamRequestHeaderProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index fc61148..826facb 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -149,7 +149,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi()
.stream(null, getRoutingTable(cluster.getGroup().getPeers(), primaryServer));
futures.add(CompletableFuture.supplyAsync(() -> DataStreamTestUtils.writeAndCloseAndAssertReplies(
- servers, leader, out, bufferSize, bufferNum, primaryClientId, cluster, stepDownLeader).join(), executor));
+ servers, leader, out, bufferSize, bufferNum, primaryClientId, client.getId(), stepDownLeader).join(), executor));
}
Assert.assertEquals(numStreams, futures.size());
return futures.stream()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 382b31e..ac0e9c1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -145,7 +145,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClientReply clientReply = DataStreamTestUtils.writeAndCloseAndAssertReplies(
CollectionUtils.as(servers, Server::getRaftServer), null, out, bufferSize, bufferNum,
- getPrimaryClientId(), null, false).join();
+ getPrimaryClientId(), client.getId(), false).join();
if (expectedException != null) {
Assert.assertFalse(clientReply.isSuccess());
Assert.assertTrue(clientReply.getException().getMessage().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 89eed98..14d5fe5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -288,7 +288,7 @@ public interface DataStreamTestUtils {
static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum,
- ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
+ ClientId primaryClientId, ClientId clientId, boolean stepDownLeader) {
LOG.info("start Stream{}", out.getHeader().getCallId());
final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
try {
@@ -301,7 +301,7 @@ public interface DataStreamTestUtils {
LOG.info("Stream{}: bytesWritten={}", out.getHeader().getCallId(), bytesWritten);
return out.closeAsync().thenCompose(
- reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, cluster, stepDownLeader));
+ reply -> assertCloseReply(out, reply, bytesWritten, leader, primaryClientId, clientId, stepDownLeader));
}
static void assertHeader(RaftServer server, RaftClientRequest header, int dataSize, boolean stepDownLeader)
@@ -323,9 +323,10 @@ public interface DataStreamTestUtils {
}
static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
- long bytesWritten, RaftPeerId leader, ClientId primaryClientId, MiniRaftCluster cluster, boolean stepDownLeader) {
+ long bytesWritten, RaftPeerId leader, ClientId primaryClientId, ClientId clientId, boolean stepDownLeader) {
// Test close idempotent
Assert.assertSame(dataStreamReply, out.closeAsync().join());
+ Assert.assertEquals(dataStreamReply.getClientId(), clientId);
BaseTest.testFailureCase("writeAsync should fail",
() -> out.writeAsync(DataStreamRequestByteBuffer.EMPTY_BYTE_BUFFER).join(),
CompletionException.class, (Logger) null, AlreadyClosedException.class);