You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/18 06:00:43 UTC

[pulsar] branch master updated: [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7913fe5614f [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)
7913fe5614f is described below

commit 7913fe5614fed853f91199f658fe2584f10e38b9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Aug 18 14:00:35 2022 +0800

    [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)
---
 .../apache/pulsar/client/impl/ClientCnxTest.java   | 100 +++++++++++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  10 ++-
 2 files changed, 106 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
new file mode 100644
index 00000000000..7f2d11fd3ad
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class ClientCnxTest extends MockedPulsarServiceBaseTest {
+
+    public static final String CLUSTER_NAME = "test";
+    public static final String TENANT = "tnx";
+    public static final String NAMESPACE = TENANT + "/ns1";
+    public static String persistentTopic = "persistent://" + NAMESPACE + "/test";
+    ExecutorService executorService = Executors.newFixedThreadPool(20);
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
+                .serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant(TENANT,
+                new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+        admin.namespaces().createNamespace(NAMESPACE);
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        this.executorService.shutdown();
+    }
+
+    @Test
+    public void testRemoveAndHandlePendingRequestInCnx() throws Exception {
+
+        String subName = "sub";
+        int operationTimes = 5000;
+        CountDownLatch countDownLatch = new CountDownLatch(operationTimes);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .topic(persistentTopic)
+                .subscriptionName(subName)
+                .subscribe();
+
+        new Thread(() -> {
+            for (int i = 0; i < operationTimes; i++) {
+                executorService.submit(() -> {
+                    consumer.getLastMessageIdAsync().whenComplete((ignore, exception) -> {
+                        countDownLatch.countDown();
+                    });
+                });
+            }
+        }).start();
+
+        for (int i = 0; i < operationTimes; i++) {
+            ClientCnx cnx = ((ConsumerImpl<?>) consumer).getClientCnx();
+            if (cnx != null) {
+                ChannelHandlerContext context = cnx.ctx();
+                if (context != null) {
+                    cnx.ctx().close();
+                }
+            }
+        }
+
+        Awaitility.await().until(() -> {
+            countDownLatch.await();
+            return true;
+        });
+
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 9e1b34c974c..eb39fe53f1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -290,7 +290,11 @@ public class ClientCnx extends PulsarHandler {
                 "Disconnected from server at " + ctx.channel().remoteAddress());
 
         // Fail out all the pending ops
-        pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
+        pendingRequests.forEach((key, future) -> {
+            if (pendingRequests.remove(key, future) && !future.isDone()) {
+                future.completeExceptionally(e);
+            }
+        });
         waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
@@ -299,7 +303,6 @@ public class ClientCnx extends PulsarHandler {
         transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this));
         topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this));
 
-        pendingRequests.clear();
         waitingLookupRequests.clear();
 
         producers.clear();
@@ -900,8 +903,7 @@ public class ClientCnx extends PulsarHandler {
         if (flush) {
             ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
                 if (!writeFuture.isSuccess()) {
-                    CompletableFuture<?> newFuture = pendingRequests.remove(requestId);
-                    if (newFuture != null && !newFuture.isDone()) {
+                    if (pendingRequests.remove(requestId, future) && !future.isDone()) {
                         log.warn("{} Failed to send {} to broker: {}", ctx.channel(),
                                 requestType.getDescription(), writeFuture.cause().getMessage());
                         future.completeExceptionally(writeFuture.cause());