You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/10 07:46:28 UTC
[incubator-ratis] branch master updated: RATIS-1140. Do not create
DataStreamOutput for non-primary server (#264)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new f4c2cec RATIS-1140. Do not create DataStreamOutput for non-primary server (#264)
f4c2cec is described below
commit f4c2cec00854e290855c1187687e4e7d123d036c
Author: Rui Wang <am...@users.noreply.github.com>
AuthorDate: Mon Nov 9 23:34:33 2020 -0800
RATIS-1140. Do not create DataStreamOutput for non-primary server (#264)
* RATIS-1140. Does not create DataStreamOutput for non-primary server
* trigger new CI check
---
.../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 4 +++-
.../org/apache/ratis/datastream/DataStreamBaseTest.java | 13 +++++++------
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index e7fec64..496e83c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -57,6 +57,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -253,7 +254,8 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
return new StreamInfo(request, stateMachine.data().stream(request),
- proxies.getDataStreamOutput(request));
+ server.getId().equals(request.getServerId())?
+ proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
} catch (Throwable e) {
throw new CompletionException(e);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 680e5de..010eca8 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -335,17 +335,18 @@ abstract class DataStreamBaseTest extends BaseTest {
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
final Server server = new Server(peers.get(i), raftServers.get(i));
- if (i == 0) {
- // only the first server routes requests to peers.
- List<RaftPeer> otherPeers = new ArrayList<>(peers);
- otherPeers.remove(peers.get(i));
- server.addRaftPeers(otherPeers);
- }
+ server.addRaftPeers(removePeerFromList(peers.get(i), peers));
server.start();
servers.add(server);
}
}
+ private Collection<RaftPeer> removePeerFromList(RaftPeer peer, List<RaftPeer> peers) {
+ List<RaftPeer> otherPeers = new ArrayList<>(peers);
+ otherPeers.remove(peer);
+ return otherPeers;
+ }
+
RaftClient newRaftClientForDataStream() {
return RaftClient.newBuilder()
.setRaftGroup(raftGroup)