You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/10/20 06:29:13 UTC

[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

szetszwo commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508239363



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       peers and proxies must be consistent with each others.  Let me refactor the code to make it clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org