You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/08/18 06:07:15 UTC
[pulsar] branch branch-2.10 updated: [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)
This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 8b792c49ccc [Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)
8b792c49ccc is described below
commit 8b792c49ccc3af6e216af4cdfa6212fb1859ba24
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Aug 18 14:00:35 2022 +0800
[Fix][Client] Fixed cnx channel Inactive causing the request fail to time out and fail to return (#17051)
(cherry picked from commit 7913fe5614fed853f91199f658fe2584f10e38b9)
---
.../apache/pulsar/client/impl/ClientCnxTest.java | 100 +++++++++++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 10 ++-
2 files changed, 106 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
new file mode 100644
index 00000000000..7f2d11fd3ad
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Sets;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class ClientCnxTest extends MockedPulsarServiceBaseTest {
+
+ public static final String CLUSTER_NAME = "test";
+ public static final String TENANT = "tnx";
+ public static final String NAMESPACE = TENANT + "/ns1";
+ public static String persistentTopic = "persistent://" + NAMESPACE + "/test";
+ ExecutorService executorService = Executors.newFixedThreadPool(20);
+
+ @BeforeClass
+ @Override
+ protected void setup() throws Exception {
+ super.internalSetup();
+ admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
+ .serviceUrl(pulsar.getWebServiceAddress()).build());
+ admin.tenants().createTenant(TENANT,
+ new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
+ admin.namespaces().createNamespace(NAMESPACE);
+ }
+
+ @AfterClass(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ this.executorService.shutdown();
+ }
+
+ @Test
+ public void testRemoveAndHandlePendingRequestInCnx() throws Exception {
+
+ String subName = "sub";
+ int operationTimes = 5000;
+ CountDownLatch countDownLatch = new CountDownLatch(operationTimes);
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(persistentTopic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ new Thread(() -> {
+ for (int i = 0; i < operationTimes; i++) {
+ executorService.submit(() -> {
+ consumer.getLastMessageIdAsync().whenComplete((ignore, exception) -> {
+ countDownLatch.countDown();
+ });
+ });
+ }
+ }).start();
+
+ for (int i = 0; i < operationTimes; i++) {
+ ClientCnx cnx = ((ConsumerImpl<?>) consumer).getClientCnx();
+ if (cnx != null) {
+ ChannelHandlerContext context = cnx.ctx();
+ if (context != null) {
+ cnx.ctx().close();
+ }
+ }
+ }
+
+ Awaitility.await().until(() -> {
+ countDownLatch.await();
+ return true;
+ });
+
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 033365b362b..7fbe1faa38f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -274,7 +274,11 @@ public class ClientCnx extends PulsarHandler {
"Disconnected from server at " + ctx.channel().remoteAddress());
// Fail out all the pending ops
- pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
+ pendingRequests.forEach((key, future) -> {
+ if (pendingRequests.remove(key, future) && !future.isDone()) {
+ future.completeExceptionally(e);
+ }
+ });
waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));
// Notify all attached producers/consumers so they have a chance to reconnect
@@ -282,7 +286,6 @@ public class ClientCnx extends PulsarHandler {
consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this));
- pendingRequests.clear();
waitingLookupRequests.clear();
producers.clear();
@@ -877,8 +880,7 @@ public class ClientCnx extends PulsarHandler {
if (flush) {
ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
- CompletableFuture<?> newFuture = pendingRequests.remove(requestId);
- if (newFuture != null && !newFuture.isDone()) {
+ if (pendingRequests.remove(requestId, future) && !future.isDone()) {
log.warn("{} Failed to send {} to broker: {}", ctx.channel(),
requestType.getDescription(), writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());