You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/10/14 11:15:06 UTC

[pulsar] branch master updated: [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)

This is an automated email from the ASF dual-hosted git repository.

shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b4518802f0d [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)
b4518802f0d is described below

commit b4518802f0dfad857abf3575758a1f69aa9457f8
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Fri Oct 14 19:14:56 2022 +0800

    [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)
    
    ### Motivation
    https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297
    The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`
    
    ### Modifications
    We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException
    
    ### Verifying this change
    Add unit test case to cover this change
    
    ### Documentation
    
    <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
    
    - [ ] `doc-required`
    (Your PR needs to update docs and you will update later)
    
    - [x] `doc-not-needed`
    bug fixs, no need doc
    
    - [ ] `doc`
    (Your PR contains doc changes)
    
    - [ ] `doc-complete`
    (Docs have been already added)
---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  8 +++
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 66 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a6b9005611c..14a33cd3203 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -141,6 +141,9 @@ public class ClientCnx extends PulsarHandler {
 
     private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
+
+    @VisibleForTesting
+    @Getter(AccessLevel.PACKAGE)
     private final Semaphore pendingLookupRequestSemaphore;
     private final Semaphore maxLookupRequestSemaphore;
     private final EventLoopGroup eventLoopGroup;
@@ -780,6 +783,11 @@ public class ClientCnx extends PulsarHandler {
         TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>();
 
         if (pendingLookupRequestSemaphore.tryAcquire()) {
+            future.whenComplete((lookupDataResult, throwable) -> {
+                if (throwable instanceof ConnectException) {
+                    pendingLookupRequestSemaphore.release();
+                }
+            });
             addPendingLookupRequests(requestId, future);
             ctx.writeAndFlush(request).addListener(writeFuture -> {
                 if (!writeFuture.isSuccess()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index c6eba43fb7a..63aa7b7048b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -34,6 +34,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.Consumer;
@@ -50,6 +51,7 @@ import org.apache.pulsar.common.api.proto.ServerError;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.PulsarHandler;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
 import org.testng.annotations.Test;
 
 public class ClientCnxTest {
@@ -80,6 +82,70 @@ public class ClientCnxTest {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testPendingLookupRequestSemaphore() throws Exception {
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(10_000);
+        conf.setKeepAliveIntervalSeconds(0);
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+        cnx.channelActive(ctx);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        CompletableFuture<Exception> completableFuture = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(1_000);
+                CompletableFuture<BinaryProtoLookupService.LookupDataResult> future =
+                        cnx.newLookup(null, 123);
+                countDownLatch.countDown();
+                future.get();
+            } catch (Exception e) {
+                completableFuture.complete(e);
+            }
+        }).start();
+        countDownLatch.await();
+        cnx.channelInactive(ctx);
+        assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.ConnectException);
+        // wait for subsequent calls over
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
+        });
+        eventLoop.shutdownGracefully();
+    }
+
+    @Test
+    public void testPendingWaitingLookupRequestSemaphore() throws Exception {
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(10_000);
+        conf.setKeepAliveIntervalSeconds(0);
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+        cnx.channelActive(ctx);
+        for (int i = 0; i < 5001; i++) {
+            cnx.newLookup(null, i);
+        }
+        cnx.channelInactive(ctx);
+        // wait for subsequent calls over
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
+        });
+        eventLoop.shutdownGracefully();
+    }
+
     @Test
     public void testReceiveErrorAtSendConnectFrameState() throws Exception {
         ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");