You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/01 12:53:25 UTC

[ratis] branch master updated: RATIS-1447. Cleanup NettyClientStreamRpc#replies empty queue (#542)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new eb1ea9f  RATIS-1447. Cleanup NettyClientStreamRpc#replies empty queue (#542)
eb1ea9f is described below

commit eb1ea9feee179139cc7e8a9faed2be3159e722d3
Author: hao guo <gu...@360.cn>
AuthorDate: Wed Dec 1 20:53:17 2021 +0800

    RATIS-1447. Cleanup NettyClientStreamRpc#replies empty queue (#542)
---
 .../org/apache/ratis/netty/NettyConfigKeys.java    | 13 +++++
 .../ratis/netty/client/NettyClientStreamRpc.java   | 62 ++++++++++++++++++----
 2 files changed, 66 insertions(+), 9 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
index 9d7f45d..907cda6 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java
@@ -19,6 +19,7 @@ package org.apache.ratis.netty;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.thirdparty.io.netty.util.NettyRuntime;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,6 +94,18 @@ public interface NettyConfigKeys {
     static void setClientWorkerGroupShare(RaftProperties properties, boolean clientWorkerGroupShare) {
       setBoolean(properties::setBoolean, CLIENT_WORKER_GROUP_SHARE_KEY, clientWorkerGroupShare);
     }
+
+    String CLIENT_REPLY_QUEUE_GRACE_PERIOD_KEY = PREFIX + ".client.reply.queue.grace-period";
+    TimeDuration CLIENT_REPLY_QUEUE_GRACE_PERIOD_DEFAULT = TimeDuration.ONE_SECOND;
+
+    static TimeDuration clientReplyQueueGracePeriod(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(CLIENT_REPLY_QUEUE_GRACE_PERIOD_DEFAULT.getUnit()),
+          CLIENT_REPLY_QUEUE_GRACE_PERIOD_KEY, CLIENT_REPLY_QUEUE_GRACE_PERIOD_DEFAULT, getDefaultLog());
+    }
+
+    static void setClientReplyQueueGracePeriod(RaftProperties properties, TimeDuration timeoutDuration) {
+      setTimeDuration(properties::setTimeDuration, CLIENT_REPLY_QUEUE_GRACE_PERIOD_KEY, timeoutDuration);
+    }
   }
 
   static void main(String[] args) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 39a73fd..1734e5b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -38,10 +38,12 @@ import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
 import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -87,11 +89,41 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
     }
   }
 
+  static class ReplyQueue implements Iterable<CompletableFuture<DataStreamReply>> {
+    static final ReplyQueue EMPTY = new ReplyQueue();
+
+    private final Queue<CompletableFuture<DataStreamReply>> queue = new ConcurrentLinkedQueue<>();
+    private int emptyId;
+
+    /** @return an empty ID if the queue is empty; otherwise, the queue is non-empty, return null. */
+    synchronized Integer getEmptyId() {
+      return queue.isEmpty()? emptyId: null;
+    }
+
+    synchronized boolean offer(CompletableFuture<DataStreamReply> f) {
+      if (queue.offer(f)) {
+        emptyId++;
+        return true;
+      }
+      return false;
+    }
+
+    CompletableFuture<DataStreamReply> poll() {
+      return queue.poll();
+    }
+
+    @Override
+    public Iterator<CompletableFuture<DataStreamReply>> iterator() {
+      return queue.iterator();
+    }
+  }
+
   private final String name;
   private final WorkerGroupGetter workerGroup;
   private final Supplier<Channel> channel;
-  private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
-      new ConcurrentHashMap<>();
+  private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
+  private final TimeDuration replyQueueGracePeriod;
+  private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
@@ -103,6 +135,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         .option(ChannelOption.SO_KEEPALIVE, true)
         .connect(NetUtils.createSocketAddr(server.getDataStreamAddress()));
     this.channel = JavaUtils.memoize(() -> f.syncUninterruptibly().channel());
+    this.replyQueueGracePeriod = NettyConfigKeys.DataStream.clientReplyQueueGracePeriod(properties);
   }
 
   private Channel getChannel() {
@@ -123,16 +156,28 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         final DataStreamReply reply = (DataStreamReply) msg;
         LOG.debug("{}: read {}", this, reply);
         clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
-        Optional.ofNullable(replies.get(clientInvocationId))
-            .map(Queue::poll)
-            .ifPresent(f -> f.complete(reply));
+        final ReplyQueue queue = replies.get(clientInvocationId);
+        if (queue != null) {
+          final CompletableFuture<DataStreamReply> f = queue.poll();
+          if (f != null) {
+            f.complete(reply);
+            final Integer emptyId = queue.getEmptyId();
+            if (emptyId != null) {
+              timeoutScheduler.onTimeout(replyQueueGracePeriod,
+                  // remove the queue if the same queue has been empty for the entire grace period.
+                  () -> replies.computeIfPresent(clientInvocationId,
+                      (key, q) -> q == queue && emptyId.equals(q.getEmptyId())? null: q),
+                  LOG, () -> "Timeout check failed, clientInvocationId=" + clientInvocationId);
+            }
+          }
+        }
       }
 
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
         Optional.ofNullable(clientInvocationId)
             .map(replies::remove)
-            .orElseGet(LinkedList::new)
+            .orElse(ReplyQueue.EMPTY)
             .forEach(f -> f.completeExceptionally(cause));
 
         LOG.warn(name + ": exceptionCaught", cause);
@@ -189,8 +234,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
   public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
     final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
     ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
-    final Queue<CompletableFuture<DataStreamReply>> q = replies.computeIfAbsent(
-       clientInvocationId, key -> new ConcurrentLinkedQueue<>());
+    final ReplyQueue q = replies.computeIfAbsent(clientInvocationId, key -> new ReplyQueue());
     if (!q.offer(f)) {
       f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
       return f;