You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/12/14 02:10:06 UTC
[pulsar] branch branch-2.10 updated: [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 3996f3dc39e [fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219)
3996f3dc39e is described below
commit 3996f3dc39e38f02d81c0bb0a1d5dbf1036467b9
Author: ZhangJian He <sh...@gmail.com>
AuthorDate: Thu Oct 27 23:01:37 2022 +0800
[fix] [pulsar-client] Fix pendingLookupRequestSemaphore leak when Ser… (#18219)
### Motivation
https://github.com/apache/pulsar/blob/b061c6ac5833c21e483368febebd0d30679a35e1/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java#L748-L774
The `pendingLookupRequestSemaphore` will leak when handleError. There are `LookUpRequestSemaphore` not released when removing it from `pendingRequests`
related PR: #17856
### Modifications
We can't easily release the semaphore in `handleError`, because there are not only `LookUpRequest`. So release the semaphore when LookupException
### Verifying this change
Add unit test case to cover this change
### Documentation
<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
- [ ] `doc-required`
(Your PR needs to update docs and you will update later)
- [x] `doc-not-needed`
bug fixs, no need doc
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
(cherry picked from commit fad3cccf87480a7a8c3a938cf5ca539b9a033106)
---
.../org/apache/pulsar/client/impl/ClientCnx.java | 3 +-
.../apache/pulsar/client/impl/ClientCnxTest.java | 43 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 1 deletion(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index d214f60c87c..1edaa8095de 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -763,7 +763,8 @@ public class ClientCnx extends PulsarHandler {
if (pendingLookupRequestSemaphore.tryAcquire()) {
future.whenComplete((lookupDataResult, throwable) -> {
- if (throwable instanceof ConnectException) {
+ if (throwable instanceof ConnectException
+ || throwable instanceof PulsarClientException.LookupException) {
pendingLookupRequestSemaphore.release();
}
});
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index c46101fd47f..a33d338fa22 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -114,6 +114,49 @@ public class ClientCnxTest {
eventLoop.shutdownGracefully();
}
+ @Test
+ public void testPendingLookupRequestSemaphoreServiceNotReady() throws Exception {
+ EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(10_000);
+ conf.setKeepAliveIntervalSeconds(0);
+ ClientCnx cnx = new ClientCnx(conf, eventLoop);
+
+ ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+ Channel channel = mock(Channel.class);
+ when(ctx.channel()).thenReturn(channel);
+ ChannelFuture listenerFuture = mock(ChannelFuture.class);
+ when(listenerFuture.addListener(any())).thenReturn(listenerFuture);
+ when(ctx.writeAndFlush(any())).thenReturn(listenerFuture);
+ cnx.channelActive(ctx);
+ cnx.state = ClientCnx.State.Ready;
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ CompletableFuture<Exception> completableFuture = new CompletableFuture<>();
+ new Thread(() -> {
+ try {
+ Thread.sleep(1_000);
+ CompletableFuture<BinaryProtoLookupService.LookupDataResult> future =
+ cnx.newLookup(null, 123);
+ countDownLatch.countDown();
+ future.get();
+ } catch (Exception e) {
+ completableFuture.complete(e);
+ }
+ }).start();
+ countDownLatch.await();
+ CommandError commandError = new CommandError();
+ commandError.setRequestId(123L);
+ commandError.setError(ServerError.ServiceNotReady);
+ commandError.setMessage("Service not ready");
+ cnx.handleError(commandError);
+ assertTrue(completableFuture.get().getCause() instanceof PulsarClientException.LookupException);
+ // wait for subsequent calls over
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(cnx.getPendingLookupRequestSemaphore().availablePermits(), conf.getConcurrentLookupRequest());
+ });
+ eventLoop.shutdownGracefully();
+ }
+
@Test
public void testPendingWaitingLookupRequestSemaphore() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, false, new DefaultThreadFactory("testClientCnxTimeout"));