You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:26 UTC
[pulsar] 22/33: [Java Client] Send CloseConsumer on timeout (#16616)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c2bb5530b48fd15b0cf7b6bc7d2fc6916006d6cf
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Jul 29 21:31:30 2022 -0500
[Java Client] Send CloseConsumer on timeout (#16616)
(cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918)
---
.../apache/pulsar/client/api/ClientErrorsTest.java | 47 +++++++++++++++++++++-
.../apache/pulsar/client/impl/ConsumerImpl.java | 9 +++++
site2/docs/developing-binary-protocol.md | 9 +++++
3 files changed, 64 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 4353297666a..ec4d56e6fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -240,7 +240,7 @@ public class ClientErrorsTest {
});
// Create producer should succeed then upon closure, it should reattempt creation. The first request will
- // timeout, which triggers CloseProducer. The client might send send the third Producer command before the
+ // time out, which triggers CloseProducer. The client might send the third Producer command before the
// below assertion, so we pass with 2 or 3.
client.newProducer().topic(topic).create();
Awaitility.await().until(() -> closeProducerCounter.get() == 1);
@@ -249,6 +249,51 @@ public class ClientErrorsTest {
mockBrokerService.resetHandleCloseProducer();
}
+ @Test
+ public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+ consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+ }
+
+ @Test
+ public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+ consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+ }
+
+ private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception {
+ @Cleanup
+ PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+ .operationTimeout(1, TimeUnit.SECONDS).build();
+ final AtomicInteger subscribeCounter = new AtomicInteger(0);
+ final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
+
+ mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
+ int subscribeCount = subscribeCounter.incrementAndGet();
+ if (subscribeCount == 1) {
+ ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+ // Trigger reconnect
+ ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
+ } else if (subscribeCount != 2) {
+ // Respond to subsequent requests to prevent timeouts
+ ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+ }
+ // Don't respond to the second Subscribe command to ensure timeout
+ });
+
+ mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
+ closeConsumerCounter.incrementAndGet();
+ ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
+ });
+
+ // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first
+ // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before
+ // the below assertion, so we pass with 2 or 3.
+ client.newConsumer().topic(topic).subscriptionName("test").subscribe();
+ Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
+ Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3);
+ mockBrokerService.resetHandleSubscribe();
+ mockBrokerService.resetHandleCloseConsumer();
+ }
+
@Test
public void testProducerFailDoesNotFailOtherProducer() throws Exception {
producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2bef30012b4..69282f6bea6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -867,6 +867,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
subscription, cnx.channel().remoteAddress());
+ if (e.getCause() instanceof PulsarClientException.TimeoutException) {
+ // Creating the consumer has timed out. We need to ensure the broker closes the consumer
+ // in case it was indeed created, otherwise it might prevent new create consumer operation,
+ // since we are not necessarily closing the connection.
+ long closeRequestId = client.newRequestId();
+ ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
+ cnx.sendRequestWithId(cmd, closeRequestId);
+ }
+
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md
index 3caf769ad2c..fdab6c73465 100644
--- a/site2/docs/developing-binary-protocol.md
+++ b/site2/docs/developing-binary-protocol.md
@@ -310,6 +310,10 @@ subscription is not already there, a new one will be created.
![Consumer](assets/binary-protocol-consumer.png)
+If the client does not receive a response indicating consumer creation success or failure,
+the client should first send a command to close the original consumer before sending a
+command to re-attempt consumer creation.
+
#### Flow control
After the consumer is ready, the client needs to *give permission* to the
@@ -419,6 +423,11 @@ Parameters:
This command behaves the same as [`CloseProducer`](#command-closeproducer)
+If the client does not receive a response to a `Subscribe` command within a timeout,
+the client must first send a `CloseConsumer` command before sending another
+`Subscribe` command. The client does not need to await a response to the `CloseConsumer`
+command before sending the next `Subscribe` command.
+
##### Command RedeliverUnacknowledgedMessages
A consumer can ask the broker to redeliver some or all of the pending messages