You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/10 07:46:28 UTC

[incubator-ratis] branch master updated: RATIS-1140. Do not create DataStreamOutput for non-primary server (#264)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new f4c2cec  RATIS-1140. Do not create DataStreamOutput for non-primary server (#264)
f4c2cec is described below

commit f4c2cec00854e290855c1187687e4e7d123d036c
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Nov 9 23:34:33 2020 -0800

    RATIS-1140. Do not create DataStreamOutput for non-primary server (#264)
    
    * RATIS-1140. Does not create DataStreamOutput for non-primary server
    
    * trigger new CI check
---
 .../org/apache/ratis/netty/server/NettyServerStreamRpc.java |  4 +++-
 .../org/apache/ratis/datastream/DataStreamBaseTest.java     | 13 +++++++------
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index e7fec64..496e83c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -57,6 +57,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
@@ -253,7 +254,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
           RaftClientRequestProto.parseFrom(buf.nioBuffer()));
       final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
       return new StreamInfo(request, stateMachine.data().stream(request),
-          proxies.getDataStreamOutput(request));
+          server.getId().equals(request.getServerId())?
+          proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
     } catch (Throwable e) {
       throw new CompletionException(e);
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 680e5de..010eca8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -335,17 +335,18 @@ abstract class DataStreamBaseTest extends BaseTest {
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
       final Server server = new Server(peers.get(i), raftServers.get(i));
-      if (i == 0) {
-        // only the first server routes requests to peers.
-        List<RaftPeer> otherPeers = new ArrayList<>(peers);
-        otherPeers.remove(peers.get(i));
-        server.addRaftPeers(otherPeers);
-      }
+      server.addRaftPeers(removePeerFromList(peers.get(i), peers));
       server.start();
       servers.add(server);
     }
   }
 
+  private Collection<RaftPeer> removePeerFromList(RaftPeer peer, List<RaftPeer> peers) {
+    List<RaftPeer> otherPeers = new ArrayList<>(peers);
+    otherPeers.remove(peer);
+    return otherPeers;
+  }
+
   RaftClient newRaftClientForDataStream() {
     return RaftClient.newBuilder()
         .setRaftGroup(raftGroup)