You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/10/20 04:30:10 UTC

[GitHub] [incubator-ratis] szetszwo opened a new pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

szetszwo opened a new pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225


   See https://issues.apache.org/jira/browse/RATIS-1099


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508208171



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       What if when there isn't client of a peer exist and this `NettyServerStreamRpc` tries to create a new one by `newClient`, but still fails to create this client? Will it throw a IOException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang merged pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang merged pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508242484



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -121,29 +146,30 @@ private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext c
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {

Review comment:
       Let's make streaming working first and ignore exception handling for the moment.  We will work on exception handling in separated issues.  (Just like that we separated the retrying in this issue and didn't want RATIS-1082 becoming to big.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508239363



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       peers and proxies must be consistent with each others.  Let me refactor the code to make it clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#issuecomment-712643393


   @szetszwo LGTM. Just some minor comments and need to fix style error.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508316141



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -121,29 +146,30 @@ private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext c
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
         final boolean isHeader = request.getStreamOffset() == -1;
 
-        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[peers.size() + 1];
 
         final CompletableFuture<?> localWrites = isHeader?
                 streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
                 : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
         parallelWrites[0] = localWrites;
-        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
 
           // do not need to forward header request
         if (isHeader) {
-          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
-            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          final List<DataStreamOutput> outs = getDataStreamOutput();
+          peersStreamOutput.put(request.getStreamId(), outs);
+          for (int i = 0; i < outs.size(); i++) {
+            parallelWrites[i + 1] = outs.get(i).getHeaderFuture();
           }
         } else {
           // body
-          for (int i = 0; i < clients.size(); i++) {
-            parallelWrites[i + 1]  =
-              peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+          final List<DataStreamOutput> outs = peersStreamOutput.get(request.getStreamId());
+          for (int i = 0; i < peers.size(); i++) {

Review comment:
       After refactoring, size() is not needed.  Will push a change soon.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508755380



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       Thanks for the update. It looks more clear now how does clients and stream output are created and failures are handled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508209482



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -121,29 +146,30 @@ private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext c
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {

Review comment:
       Is there a need to add an exception handler in to handle IOException
   
   ```
    private ChannelInitializer<SocketChannel> getInitializer(){
       return new ChannelInitializer<SocketChannel>(){
         @Override
         public void initChannel(SocketChannel ch) {
           ChannelPipeline p = ch.pipeline();
           p.addLast(new DataStreamRequestDecoder());
           p.addLast(new DataStreamReplyEncoder());
           p.addLast(getServerHandler());
         }
       };
   ```?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508209482



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -121,29 +146,30 @@ private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext c
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {

Review comment:
       Is there a need to add an exception handler in to handle IOException?
   
   ```
    private ChannelInitializer<SocketChannel> getInitializer(){
       return new ChannelInitializer<SocketChannel>(){
         @Override
         public void initChannel(SocketChannel ch) {
           ChannelPipeline p = ch.pipeline();
           p.addLast(new DataStreamRequestDecoder());
           p.addLast(new DataStreamReplyEncoder());
           p.addLast(getServerHandler());
         }
       };
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#issuecomment-713204506


   Reopen pr to trigger CI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#issuecomment-713130398


   Filed https://issues.apache.org/jira/browse/RATIS-1101 for the flaky UT


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508208171



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       What if when there is not client of a peer and it tries to create a new one by `newClient`, but still fails to create this client? Should it throw a IOException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508208171



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       What if when there is not client of a peer and it tries to create a new one by `newClient`, but still fails to create this client? Will it throw a IOException?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508254245



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -121,29 +146,30 @@ private void sendReply(DataStreamRequestByteBuf request, ChannelHandlerContext c
   private ChannelInboundHandler getServerHandler(){
     return new ChannelInboundHandlerAdapter(){
       @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) {
+      public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         final ByteBuf buf = request.slice();
         final boolean isHeader = request.getStreamOffset() == -1;
 
-        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[clients.size() + 1];
+        CompletableFuture<?>[] parallelWrites = new CompletableFuture<?>[peers.size() + 1];
 
         final CompletableFuture<?> localWrites = isHeader?
                 streams.computeIfAbsent(request.getStreamId(), id -> getDataStreamFuture(buf))
                 : streams.get(request.getStreamId()).thenApply(stream -> writeTo(buf, stream));
         parallelWrites[0] = localWrites;
-        peersStreamOutput.putIfAbsent(request.getStreamId(), getDataStreamOutput());
 
           // do not need to forward header request
         if (isHeader) {
-          for (int i = 0; i < peersStreamOutput.get(request.getStreamId()).size(); i++) {
-            parallelWrites[i + 1] = peersStreamOutput.get(request.getStreamId()).get(i).getHeaderFuture();
+          final List<DataStreamOutput> outs = getDataStreamOutput();
+          peersStreamOutput.put(request.getStreamId(), outs);
+          for (int i = 0; i < outs.size(); i++) {
+            parallelWrites[i + 1] = outs.get(i).getHeaderFuture();
           }
         } else {
           // body
-          for (int i = 0; i < clients.size(); i++) {
-            parallelWrites[i + 1]  =
-              peersStreamOutput.get(request.getStreamId()).get(i).writeAsync(request.slice().nioBuffer());
+          final List<DataStreamOutput> outs = peersStreamOutput.get(request.getStreamId());
+          for (int i = 0; i < peers.size(); i++) {

Review comment:
       How about peers.size() -> outs.size() ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#discussion_r508208171



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -65,23 +68,45 @@
   private final ConcurrentMap<Long, CompletableFuture<DataStream>> streams = new ConcurrentHashMap<>();
   private final ConcurrentMap<Long, List<DataStreamOutput>> peersStreamOutput = new ConcurrentHashMap<>();
 
-  private List<DataStreamClient> clients = new ArrayList<>();
+  private final List<RaftPeer> peers = new CopyOnWriteArrayList<>();
+  private final PeerProxyMap<DataStreamClient> proxies;
 
-  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine) {
-    this.raftServer = server;
+  public NettyServerStreamRpc(RaftPeer server, StateMachine stateMachine, RaftProperties properties) {
+    this.name = server + "-" + getClass().getSimpleName();
     this.stateMachine = stateMachine;
-    this.channelFuture = buildChannel();
+    this.channelFuture = new ServerBootstrap()
+        .group(bossGroup, workerGroup)
+        .channel(NioServerSocketChannel.class)
+        .handler(new LoggingHandler(LogLevel.INFO))
+        .childHandler(getInitializer())
+        .childOption(ChannelOption.SO_KEEPALIVE, true)
+        .localAddress(NetUtils.createSocketAddr(server.getAddress()))
+        .bind();
+
+    this.proxies = new PeerProxyMap<>(name, peer -> newClient(peer, properties));
   }
 
-  public NettyServerStreamRpc(
-      RaftPeer server, List<RaftPeer> otherPeers,
-      StateMachine stateMachine, RaftProperties properties){
-    this(server, stateMachine);
-    setupClient(otherPeers, properties);
+  static DataStreamClient newClient(RaftPeer peer, RaftProperties properties) {
+    final DataStreamClient client = DataStreamClient.newBuilder()
+        .setRaftServer(peer)
+        .setProperties(properties)
+        .build();
+    client.start();
+    return client;
   }
 
-  private List<DataStreamOutput> getDataStreamOutput() {
-    return clients.stream().map(client -> client.stream()).collect(Collectors.toList());
+  @Override
+  public void addPeers(Collection<RaftPeer> newPeers) {
+    proxies.addPeers(newPeers);
+    peers.addAll(newPeers);
+  }
+
+  private List<DataStreamOutput> getDataStreamOutput() throws IOException {
+    final List<DataStreamOutput> outs = new ArrayList<>();
+    for(RaftPeer peer : peers) {
+      outs.add(proxies.getProxy(peer.getId()).stream());

Review comment:
       What if when there is not client of a peer and it tries to create a new one by `newClient`, but still fails to create this client?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#issuecomment-713215841


   @szetszwo Thanks the patch. @amaliujia Thanks for review. I have merged it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #225: RATIS-1099. DataStreamServerRpc should connect other peers automatically

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #225:
URL: https://github.com/apache/incubator-ratis/pull/225#issuecomment-712588014


   cc @amaliujia 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org