You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/06 08:03:50 UTC
[incubator-ratis] branch master updated: RATIS-1133. Primary and
peer should use the same RaftClientRequest (#258)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 31381b6 RATIS-1133. Primary and peer should use the same RaftClientRequest (#258)
31381b6 is described below
commit 31381b69f87587fd34fc252ba5732b741cdbd2b8
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Nov 6 16:01:14 2020 +0800
RATIS-1133. Primary and peer should use the same RaftClientRequest (#258)
---
.../org/apache/ratis/client/DataStreamRpcApi.java | 4 ++--
.../ratis/client/impl/DataStreamClientImpl.java | 21 +++++++++------------
.../ratis/netty/server/NettyServerStreamRpc.java | 15 +++++++--------
.../apache/ratis/datastream/DataStreamBaseTest.java | 7 ++-----
4 files changed, 20 insertions(+), 27 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
index 409e4e1..b7b5e3e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamRpcApi.java
@@ -20,10 +20,10 @@ package org.apache.ratis.client;
import org.apache.ratis.client.api.DataStreamApi;
import org.apache.ratis.client.api.DataStreamOutput;
-import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftClientRequest;
/** An RPC interface which extends the user interface {@link DataStreamApi}. */
public interface DataStreamRpcApi extends DataStreamApi {
/** Create a stream for primary server to send data to peer server. */
- DataStreamOutput stream(RaftGroupId groupId, long streamId);
+ DataStreamOutput stream(RaftClientRequest request);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 9efe56c..bf54fb9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -63,20 +63,15 @@ public class DataStreamClientImpl implements DataStreamClient {
this.orderedStreamAsync = new OrderedStreamAsync(clientId, dataStreamClientRpc, properties);
}
- public class DataStreamOutputImpl implements DataStreamOutputRpc {
+ public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private long streamOffset = 0;
- public DataStreamOutputImpl(RaftGroupId groupId) {
- this(groupId, RaftClientImpl.nextCallId());
- }
-
- public DataStreamOutputImpl(RaftGroupId groupId, long streamId) {
- this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, streamId,
- RaftClientRequest.writeRequestType());
- this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
+ private DataStreamOutputImpl(RaftClientRequest request) {
+ this.header = request;
+ this.headerFuture = orderedStreamAsync.sendRequest(request.getCallId(), -1,
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
}
@@ -133,12 +128,14 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutputRpc stream(RaftGroupId gid) {
- return new DataStreamOutputImpl(gid);
+ RaftClientRequest request = new RaftClientRequest(
+ clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+ return new DataStreamOutputImpl(request);
}
@Override
- public DataStreamOutputRpc stream(RaftGroupId gid, long streamId) {
- return new DataStreamOutputImpl(gid, streamId);
+ public DataStreamOutputRpc stream(RaftClientRequest request) {
+ return new DataStreamOutputImpl(request);
}
@Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index f983684..6d3143a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -31,7 +31,6 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
@@ -96,10 +95,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
peers.addAll(newPeers);
}
- List<DataStreamOutputRpc> getDataStreamOutput(RaftGroupId groupId, long streamId) throws IOException {
+ List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
final List<DataStreamOutputRpc> outs = new ArrayList<>();
try {
- getDataStreamOutput(outs, groupId, streamId);
+ getDataStreamOutput(outs, request);
} catch (IOException e) {
outs.forEach(CloseAsync::closeAsync);
throw e;
@@ -107,11 +106,11 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftGroupId groupId, long streamId)
+ private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(groupId, streamId));
+ outs.add((DataStreamOutputRpc) map.getProxy(peer.getId()).stream(request));
} catch (IOException e) {
throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
}
@@ -248,13 +247,13 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
proxies.addPeers(newPeers);
}
- private StreamInfo newStreamInfo(ByteBuf buf, long streamId) {
+ private StreamInfo newStreamInfo(ByteBuf buf) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
return new StreamInfo(request, stateMachine.data().stream(request),
- proxies.getDataStreamOutput(request.getRaftGroupId(), streamId));
+ proxies.getDataStreamOutput(request));
} catch (Throwable e) {
throw new CompletionException(e);
}
@@ -365,7 +364,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
if (request.getType() == Type.STREAM_HEADER) {
- info = streams.computeIfAbsent(key, id -> newStreamInfo(buf, request.getStreamId()));
+ info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
remoteWrites.add(out.getHeaderFuture());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 668cb17..51c6bb7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -474,11 +474,8 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
-
- final Server primary = getPrimaryServer();
- if (server == primary) {
- Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
- }
+ Assert.assertEquals(writeRequest.getClientId(), header.getClientId());
+ Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
}
static ByteBuffer initBuffer(int offset, int size) {