You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/30 02:31:35 UTC
[pulsar] branch master updated: [Java Client] Send CloseConsumer on timeout (#16616)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8f316558e2b [Java Client] Send CloseConsumer on timeout (#16616)
8f316558e2b is described below
commit 8f316558e2b3204cd197cd61f7173d64987fc918
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Jul 29 21:31:30 2022 -0500
[Java Client] Send CloseConsumer on timeout (#16616)
---
.../apache/pulsar/client/api/ClientErrorsTest.java | 47 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++
site2/docs/developing-binary-protocol.md | 9 +++++
3 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 8c8e4b96add..62e459dda0d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -240,7 +240,7 @@ public class ClientErrorsTest {
});
// Create producer should succeed then upon closure, it should reattempt creation. The first request will
- // timeout, which triggers CloseProducer. The client might send send the third Producer command before the
+ // time out, which triggers CloseProducer. The client might send the third Producer command before the
// below assertion, so we pass with 2 or 3.
client.newProducer().topic(topic).create();
Awaitility.await().until(() -> closeProducerCounter.get() == 1);
@@ -249,6 +249,51 @@ public class ClientErrorsTest {
mockBrokerService.resetHandleCloseProducer();
}
+ @Test
+ public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+ consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+ }
+
+ @Test
+ public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+ consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+ }
+
+ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+ .operationTimeout(1, TimeUnit.SECONDS).build();
+ final AtomicInteger subscribeCounter = new AtomicInteger(0);
+ final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
+
+ mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
+ int subscribeCount = subscribeCounter.incrementAndGet();
+ if (subscribeCount == 1) {
+ ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+ // Trigger reconnect
+ ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
+ } else if (subscribeCount != 2) {
+ // Respond to subsequent requests to prevent timeouts
+ ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+ }
+ // Don't respond to the second Subscribe command to ensure timeout
+ });
+
+ mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
+ closeConsumerCounter.incrementAndGet();
+ ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
+ });
+
+ // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first
+ // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before
+ // the below assertion, so we pass with 2 or 3.
+ client.newConsumer().topic(topic).subscriptionName("test").subscribe();
+ Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
+ Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3);
+ mockBrokerService.resetHandleSubscribe();
+ mockBrokerService.resetHandleCloseConsumer();
+ }
+
@Test
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2955065f313..4d858ae6263 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -900,6 +900,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
subscription, cnx.channel().remoteAddress());
+ if (e.getCause() instanceof PulsarClientException.TimeoutException) {
+ // Creating the consumer has timed out. We need to ensure the broker closes the consumer
+ // in case it was indeed created, otherwise it might prevent new create consumer operation,
+ // since we are not necessarily closing the connection.
+ long closeRequestId = client.newRequestId();
+ ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
+ cnx.sendRequestWithId(cmd, closeRequestId);
+ }
+
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md
index 07460987d2e..1e4ebe62817 100644
--- a/site2/docs/developing-binary-protocol.md
+++ b/site2/docs/developing-binary-protocol.md
@@ -333,6 +333,10 @@ Before creating or connecting a consumer, you need to perform [topic lookup](#to
:::
+If the client does not receive a response indicating consumer creation success or failure,
+the client should first send a command to close the original consumer before sending a
+command to re-attempt consumer creation.
+
#### Flow control
After the consumer is ready, the client needs to *give permission* to the
@@ -479,6 +483,11 @@ This command can be sent by either producer or broker.
This command behaves the same as [`CloseProducer`](#command-closeproducer)
+If the client does not receive a response to a `Subscribe` command within a timeout,
+the client must first send a `CloseConsumer` command before sending another
+`Subscribe` command. The client does not need to await a response to the `CloseConsumer`
+command before sending the next `Subscribe` command.
+
##### Command RedeliverUnacknowledgedMessages
A consumer can ask the broker to redeliver some or all of the pending messages