You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/03 02:39:45 UTC

[incubator-ratis] branch master updated: RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui Wang

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c77c183  RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui Wang
c77c183 is described below

commit c77c183c31d7d09b497c4f6500dc230fe5d82f43
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Nov 2 18:39:28 2020 -0800

    RATIS-1084. Support multiple groups in Streaming (#245). Contributed by Rui Wang
---
 .../org/apache/ratis/netty/server/NettyServerStreamRpc.java  | 12 +++++++-----
 .../java/org/apache/ratis/datastream/DataStreamBaseTest.java |  5 ++++-
 2 files changed, 11 insertions(+), 6 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 29a85cf..041d64d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -30,6 +30,7 @@ import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
@@ -94,10 +95,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       peers.addAll(newPeers);
     }
 
-    List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    List<DataStreamOutput> getDataStreamOutput(RaftGroupId groupId) throws IOException {
       final List<DataStreamOutput> outs = new ArrayList<>();
       try {
-        getDataStreamOutput(outs);
+        getDataStreamOutput(outs, groupId);
       } catch (IOException e) {
         outs.forEach(CloseAsync::closeAsync);
         throw e;
@@ -105,10 +106,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       return outs;
     }
 
-    private void getDataStreamOutput(List<DataStreamOutput> outs) throws IOException {
+    private void getDataStreamOutput(List<DataStreamOutput> outs, RaftGroupId groupId) throws IOException {
       for (RaftPeer peer : peers) {
         try {
-          outs.add(map.getProxy(peer.getId()).stream());
+          outs.add(map.getProxy(peer.getId()).stream(groupId));
         } catch (IOException e) {
           throw new IOException(map.getName() + ": Failed to getDataStreamOutput for " + peer, e);
         }
@@ -245,7 +246,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
-      return new StreamInfo(request, stateMachine.data().stream(request), proxies.getDataStreamOutput());
+      return new StreamInfo(request, stateMachine.data().stream(request),
+          proxies.getDataStreamOutput(request.getRaftGroupId()));
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 739767a..67d2ecc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -182,6 +182,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   protected RaftProperties properties;
 
   private List<Server> servers;
+  private RaftGroup raftGroup;
 
   Server getPrimaryServer() {
     return servers.get(0);
@@ -318,6 +319,7 @@ abstract class DataStreamBaseTest extends BaseTest {
         .map(RaftPeerId::valueOf)
         .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
         .collect(Collectors.toList());
+    raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
     servers = new ArrayList<>(peers.size());
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
@@ -363,7 +365,7 @@ abstract class DataStreamBaseTest extends BaseTest {
         clients.add(client);
         for (int i = 0; i < numStreams; i++) {
           futures.add(CompletableFuture.runAsync(
-              () -> runTestDataStream((DataStreamOutputImpl) client.stream(), bufferSize, bufferNum)));
+              () -> runTestDataStream((DataStreamOutputImpl) client.stream(raftGroup.getGroupId()), bufferSize, bufferNum)));
         }
       }
       Assert.assertEquals(numClients*numStreams, futures.size());
@@ -419,6 +421,7 @@ abstract class DataStreamBaseTest extends BaseTest {
     final RaftClientRequest writeRequest = stream.getWriteRequest();
     Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
     Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
+    Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
   }