You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/10/10 02:38:30 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17856: [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive

codelipenghui commented on code in PR #17856:
URL: https://github.com/apache/pulsar/pull/17856#discussion_r990882324


##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java:
##########
@@ -80,6 +82,38 @@ public void testClientCnxTimeout() throws Exception {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testPendingLookupRequestSemaphore() throws Exception {
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(10_000);
+        conf.setKeepAliveIntervalSeconds(0);
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+        cnx.channelActive(ctx);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                cnx.newLookup(null, 123).get();
+            } catch (Exception e) {
+                // ignore exception
+            }
+            countDownLatch.countDown();
+        }).start();
+        cnx.channelInactive(ctx);

Review Comment:
   Will this introduce a flaky test? If you try to add `Thread.sleep(1000);` before `cnx.newLookup(null, 123).get();`. The test will stuck.



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java:
##########
@@ -80,6 +82,38 @@ public void testClientCnxTimeout() throws Exception {
         eventLoop.shutdownGracefully();
     }
 
+    @Test
+    public void testPendingLookupRequestSemaphore() throws Exception {
+        EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+        ClientConfigurationData conf = new ClientConfigurationData();
+        conf.setOperationTimeoutMs(10_000);
+        conf.setKeepAliveIntervalSeconds(0);
+        ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        Channel channel = mock(Channel.class);
+        when(ctx.channel()).thenReturn(channel);
+        ChannelFuture listenerFuture = mock(ChannelFuture.class);
+        when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+        when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+        cnx.channelActive(ctx);
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        new Thread(() -> {
+            try {
+                cnx.newLookup(null, 123).get();
+            } catch (Exception e) {
+                // ignore exception
+            }
+            countDownLatch.countDown();
+        }).start();
+        cnx.channelInactive(ctx);
+        countDownLatch.await();
+        // wait for subsequent calls over
+        Thread.sleep(1_000);

Review Comment:
   Looks like we can use Awaitibility instead.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -44,6 +44,8 @@
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.BiConsumer;
+

Review Comment:
   Remove these 2 lines.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java:
##########
@@ -803,6 +808,11 @@ public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long reque
                     waitingLookupRequests.size())));
             }
         }
+        future.whenComplete((lookupDataResult, throwable) -> {
+            if (throwable instanceof ConnectException) {

Review Comment:
   Why only the `ConnectException` should be handled?
   IMO, should we release the semaphore for all exceptions except the `TooManyRequestsException`?



##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java:
##########
@@ -34,7 +34,9 @@
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;

Review Comment:
   Please remove this line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org