You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sh...@apache.org on 2022/10/14 11:15:06 UTC
[pulsar] branch master updated: [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)
This is an automated email from the ASF dual-hosted git repository.
shoothzj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b4518802f0d [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)
b4518802f0d is described below
commit b4518802f0dfad857abf3575758a1f69aa9457f8
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Fri Oct 14 19:14:56 2022 +0800
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when channel inactive (#17856)
### Motivation
https://github.com/apache/pulsar/blob/b89c1451551a6bbe681465726906a2e61c9d8a69/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L282-L297
The `pendingLookupRequestSemaphore` will leak when channel inactive. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`
### Modifications
We can't easily release the semaphore in `channelInactive`, because there are not only `LookUpRequest`. So release the semaphore when connectionException
### Verifying this change
Add unit test case to cover this change
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
bug fixs, no need doc
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 8 +++
.../apache/pulsar/client/impl/ClientCnxTest.java | 66 ++++++++++++++++++++++
2 files changed, 74 insertions(+)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index a6b9005611c..14a33cd3203 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -141,6 +141,9 @@ public class ClientCnx extends PulsarHandler {
private final CompletableFuture<Void> connectionFuture = new CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue<>();
+
+ @VisibleForTesting
+ @Getter(AccessLevel.PACKAGE)
private final Semaphore pendingLookupRequestSemaphore;
private final Semaphore maxLookupRequestSemaphore;
private final EventLoopGroup eventLoopGroup;
@@ -780,6 +783,11 @@ public class ClientCnx extends PulsarHandler {
TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>();
if (pendingLookupRequestSemaphore.tryAcquire()) {
+ future.whenComplete((lookupDataResult, throwable) -> {
+ if (throwable instanceof ConnectException) {
+ pendingLookupRequestSemaphore.release();
+ }
+ });
addPendingLookupRequests(requestId, future);
ctx.writeAndFlush(request).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index c6eba43fb7a..63aa7b7048b 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -34,6 +34,7 @@ import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.function.Consumer;
@@ -50,6 +51,7 @@ import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class ClientCnxTest {
@@ -80,6 +82,70 @@ public class ClientCnxTest {
eventLoop.shutdownGracefully();
}
+ @Test
+ public void testPendingLookupRequestSemaphore() throws Exception {
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(10_000);
+ conf.setKeepAliveIntervalSeconds(0);
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ when(ctx.channel()).thenReturn(channel);
+ ChannelFuture listenerFuture = mock(ChannelFuture.class);
+ when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+ when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+ cnx.channelActive(ctx);
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ CompletableFuture<Exception> completableFuture = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(1_000);
+ CompletableFuture<BinaryProtoLookupService.LookupDataResult> future =
+ cnx.newLookup(null, 123);
+ countDownLatch.countDown();
+ future.get();
+ } catch (Exception e) {
+ completableFuture.complete(e);
+ }
+ }).start();
+ countDownLatch.await();
+ cnx.channelInactive(ctx);
+ assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.ConnectException);
+ // wait for subsequent calls over
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
+ });
+ eventLoop.shutdownGracefully();
+ }
+
+ @Test
+ public void testPendingWaitingLookupRequestSemaphore() throws Exception {
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(10_000);
+ conf.setKeepAliveIntervalSeconds(0);
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ when(ctx.channel()).thenReturn(channel);
+ ChannelFuture listenerFuture = mock(ChannelFuture.class);
+ when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+ when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+ cnx.channelActive(ctx);
+ for (int i = 0; i < 5001; i++) {
+ cnx.newLookup(null, i);
+ }
+ cnx.channelInactive(ctx);
+ // wait for subsequent calls over
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
+ });
+ eventLoop.shutdownGracefully();
+ }
+
@Test
public void testReceiveErrorAtSendConnectFrameState() throws Exception {
ThreadFactory threadFactory = new DefaultThreadFactory("testReceiveErrorAtSendConnectFrameState");