You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/03 02:39:45 UTC
[incubator-ratis] branch master updated: RATIS-1084. Support
multiple groups in Streaming (#245). Contributed by Rui Wang
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c77c183 RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui Wang
c77c183 is described below
commit c77c183c31d7d09b497c4f6500dc230fe5d82f43
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Nov 2 18:39:28 2020 -0800
RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui Wang
---
.../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 12 +++++++-----
.../java/org/apache/ratis/datastream/DataStreamBaseTest.java | 5 ++++-
2 files changed, 11 insertions(+), 6 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 29a85cf..041d64d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -30,6 +30,7 @@ import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
@@ -94,10 +95,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
peers.addAll(newPeers);
}
- List<DataStreamOutput> getDataStreamOutput() throws IOException {
+ List<DataStreamOutput> getDataStreamOutput(RaftGroupId groupId) throws IOException {
final List<DataStreamOutput> outs = new ArrayList<>();
try {
- getDataStreamOutput(outs);
+ getDataStreamOutput(outs, groupId);
} catch (IOException e) {
outs.forEach(CloseAsync::closeAsync);
throw e;
@@ -105,10 +106,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(List<DataStreamOutput> outs) throws IOException {
+ private void getDataStreamOutput(List<DataStreamOutput> outs, RaftGroupId groupId) throws IOException {
for (RaftPeer peer : peers) {
try {
- outs.add(map.getProxy(peer.getId()).stream());
+ outs.add(map.getProxy(peer.getId()).stream(groupId));
} catch (IOException e) {
throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
}
@@ -245,7 +246,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
- return new StreamInfo(request, stateMachine.data().stream(request), proxies.getDataStreamOutput());
+ return new StreamInfo(request, stateMachine.data().stream(request),
+ proxies.getDataStreamOutput(request.getRaftGroupId()));
} catch (Throwable e) {
throw new CompletionException(e);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 739767a..67d2ecc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -182,6 +182,7 @@ abstract class DataStreamBaseTest extends BaseTest {
protected RaftProperties properties;
private List<Server> servers;
+ private RaftGroup raftGroup;
Server getPrimaryServer() {
return servers.get(0);
@@ -318,6 +319,7 @@ abstract class DataStreamBaseTest extends BaseTest {
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.collect(Collectors.toList());
+ raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
servers = new ArrayList<>(peers.size());
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
@@ -363,7 +365,7 @@ abstract class DataStreamBaseTest extends BaseTest {
clients.add(client);
for (int i = 0; i < numStreams; i++) {
futures.add(CompletableFuture.runAsync(
- () -> runTestDataStream((DataStreamOutputImpl) client.stream(), bufferSize, bufferNum)));
+ () -> runTestDataStream((DataStreamOutputImpl) client.stream(raftGroup.getGroupId()), bufferSize, bufferNum)));
}
}
Assert.assertEquals(numClients*numStreams, futures.size());
@@ -419,6 +421,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClientRequest writeRequest = stream.getWriteRequest();
Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
+ Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
Assert.assertEquals(dataSize, stream.getByteWritten());
}