You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2022/03/09 04:20:08 UTC

[GitHub] [ratis] guohao-rosicky opened a new pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

guohao-rosicky opened a new pull request #620:
URL: https://github.com/apache/ratis/pull/620


   ## What changes were proposed in this pull request?
   
   If the stream client is abnormally disconnected, reconnect the stream client.
   
   Such as:
   1. Socket channle is close
   2. The network connection is down
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1549


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] captainzmc commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
captainzmc commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1064953507


   Thanks @guohao-rosicky  update this. I did a comparative test and found that this patch can indeed optimize the problem of client hung.  While testing three clients to write 400 files concurrently, ratis master hung every time, but RATIS-1549 runs normally.
   ![image](https://user-images.githubusercontent.com/13825159/157841596-fda8285c-feae-4245-90f0-247b283f8a9e.png)
   
   Hi @szetszwo, In UT it is difficult to simulate a broken connection during write. Can we continue to discuss this patch based on the above test results?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on a change in pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #620:
URL: https://github.com/apache/ratis/pull/620#discussion_r825835544



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -261,15 +362,27 @@ protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> o
       f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
       return f;
     }
+    final Channel channel = connection.getChannelUninterruptibly();
+    if (channel == null) {
+      f.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + request));
+      return f;
+    }
     LOG.debug("{}: write {}", this, request);
-    getChannel().writeAndFlush(request);
+    channel.writeAndFlush(request).addListener(future -> {
+      if (!future.isSuccess()) {
+        final IOException e = new IOException(this + ": Failed to send " + request, future.cause());
+        LOG.error("Channel write failed", e);
+        if (!f.isDone()) {

Review comment:
       We don't have to check isDone().  The completeExceptionally API has already taken care of it; see https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#completeExceptionally-java.lang.Throwable-

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -125,30 +139,109 @@ int size() {
     }
   }
 
+  static class Connection {
+    static final TimeDuration RECONNECT = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+    private final String address;
+    private final WorkerGroupGetter workerGroup;
+    private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;
+
+    /** The {@link ChannelFuture} is null when this connection is closed. */
+    private final AtomicReference<ChannelFuture> ref;
+
+    Connection(String address, WorkerGroupGetter workerGroup,
+        Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier) {
+      this.address = address;
+      this.workerGroup = workerGroup;
+      this.channelInitializerSupplier = channelInitializerSupplier;
+      this.ref = new AtomicReference<>(connect());
+    }
+
+    Channel getChannelUninterruptibly() {
+      final ChannelFuture future = ref.get();
+      if (future == null) {
+        return null; //closed
+      }
+      final Channel channel = future.syncUninterruptibly().channel();
+      if (channel.isOpen()) {
+        return channel;
+      }
+      return reconnect().syncUninterruptibly().channel();
+    }
+
+    private EventLoopGroup getWorkerGroup() {
+      return workerGroup.get();
+    }
+
+    private ChannelFuture connect() {
+      return new Bootstrap()
+          .group(getWorkerGroup())
+          .channel(NioSocketChannel.class)
+          .handler(channelInitializerSupplier.get())
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(NetUtils.createSocketAddr(address))
+          .addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) {
+              if (!future.isSuccess()) {
+                scheduleReconnect(this + " failed", future.cause());
+              } else {
+                LOG.trace("{} succeed.", this);
+              }
+            }
+          });
+    }
+
+    void scheduleReconnect(String message, Throwable cause) {
+      LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, address, RECONNECT);
+      if (cause != null) {
+        LOG.warn("", cause);
+      }
+      getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit());
+    }
+
+    private ChannelFuture reconnect() {
+      final MemoizedSupplier<ChannelFuture> supplier = MemoizedSupplier.valueOf(this::connect);
+      final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get());
+      if (previous != null) {
+        previous.channel().close();
+      }
+      return supplier.get();

Review comment:
       We should check supplier.isInitialized().
   ```
         return supplier.isInitialized()? supplier.get(): null;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky edited a comment on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky edited a comment on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1062626001


   1. Ctx.writeAndFlush() actually puts the request into the socket buffer. It just returns a Future object, and we need to add a listener to get its status
   2. If channel inactive, we will fail to send with the current channel, so we need to trigger reconnection
   3. When connect to server, we add a listener to get the state of the connection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1066055079


   @guohao-rosicky , I suggest to refactor the connection related code to a class; see https://issues.apache.org/jira/secure/attachment/13041023/620_review.patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on a change in pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #620:
URL: https://github.com/apache/ratis/pull/620#discussion_r825895461



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -125,30 +139,109 @@ int size() {
     }
   }
 
+  static class Connection {
+    static final TimeDuration RECONNECT = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+    private final String address;
+    private final WorkerGroupGetter workerGroup;
+    private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;
+
+    /** The {@link ChannelFuture} is null when this connection is closed. */
+    private final AtomicReference<ChannelFuture> ref;
+
+    Connection(String address, WorkerGroupGetter workerGroup,
+        Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier) {
+      this.address = address;
+      this.workerGroup = workerGroup;
+      this.channelInitializerSupplier = channelInitializerSupplier;
+      this.ref = new AtomicReference<>(connect());
+    }
+
+    Channel getChannelUninterruptibly() {
+      final ChannelFuture future = ref.get();
+      if (future == null) {
+        return null; //closed
+      }
+      final Channel channel = future.syncUninterruptibly().channel();
+      if (channel.isOpen()) {
+        return channel;
+      }
+      return reconnect().syncUninterruptibly().channel();
+    }
+
+    private EventLoopGroup getWorkerGroup() {
+      return workerGroup.get();
+    }
+
+    private ChannelFuture connect() {
+      return new Bootstrap()
+          .group(getWorkerGroup())
+          .channel(NioSocketChannel.class)
+          .handler(channelInitializerSupplier.get())
+          .option(ChannelOption.SO_KEEPALIVE, true)
+          .connect(NetUtils.createSocketAddr(address))
+          .addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture future) {
+              if (!future.isSuccess()) {
+                scheduleReconnect(this + " failed", future.cause());
+              } else {
+                LOG.trace("{} succeed.", this);
+              }
+            }
+          });
+    }
+
+    void scheduleReconnect(String message, Throwable cause) {
+      LOG.warn("{}: {}; schedule reconnecting to {} in {}", this, message, address, RECONNECT);
+      if (cause != null) {
+        LOG.warn("", cause);
+      }
+      getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit());
+    }
+
+    private ChannelFuture reconnect() {
+      final MemoizedSupplier<ChannelFuture> supplier = MemoizedSupplier.valueOf(this::connect);
+      final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get());
+      if (previous != null) {
+        previous.channel().close();
+      }
+      return supplier.get();

Review comment:
       Has been modified. @szetszwo Please take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo merged pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #620:
URL: https://github.com/apache/ratis/pull/620


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1062626001


   1. Ctx.writeAndFlush() actually puts the request into the socket buffer. It just returns a Future object, and we need to add a listener to get its status
   2. If channel inactive, we will fail to send with the current channel, so we need to trigger reconnection
   3. When we connect, we add a listener to get the state of the connection


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1064754317


   @captainzmc  can you  talk about the test results of this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1066560612


   @szetszwo  I have applied your patch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on a change in pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on a change in pull request #620:
URL: https://github.com/apache/ratis/pull/620#discussion_r825891192



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -261,15 +362,27 @@ protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> o
       f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
       return f;
     }
+    final Channel channel = connection.getChannelUninterruptibly();
+    if (channel == null) {
+      f.completeExceptionally(new AlreadyClosedException(this + ": Failed to send " + request));
+      return f;
+    }
     LOG.debug("{}: write {}", this, request);
-    getChannel().writeAndFlush(request);
+    channel.writeAndFlush(request).addListener(future -> {
+      if (!future.isSuccess()) {
+        final IOException e = new IOException(this + ": Failed to send " + request, future.cause());
+        LOG.error("Channel write failed", e);
+        if (!f.isDone()) {

Review comment:
       I see. Thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1062539537


   What do you think of this feature? @szetszwo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1064753329


   Okay, I'll make a UT @szetszwo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] guohao-rosicky removed a comment on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
guohao-rosicky removed a comment on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1064753329


   Okay, I'll make a UT @szetszwo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1064137764


   @guohao-rosicky , please add some tests.  We cannot easily tell if the change is working.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [ratis] szetszwo commented on pull request #620: RATIS-1549. If the stream client is abnormally disconnected, reconnec…

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #620:
URL: https://github.com/apache/ratis/pull/620#issuecomment-1066037981


   @captainzmc , @guohao-rosicky , thanks for testing it manually.  I will review this change first.  Let me think about how to add a unit test.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org