You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 09:04:26 UTC

[pulsar] 22/33: [Java Client] Send CloseConsumer on timeout (#16616)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c2bb5530b48fd15b0cf7b6bc7d2fc6916006d6cf
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Fri Jul 29 21:31:30 2022 -0500

    [Java Client] Send CloseConsumer on timeout (#16616)
    
    (cherry picked from commit 8f316558e2b3204cd197cd61f7173d64987fc918)
---
 .../apache/pulsar/client/api/ClientErrorsTest.java | 47 +++++++++++++++++++++-
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  9 +++++
 site2/docs/developing-binary-protocol.md           |  9 +++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
index 4353297666a..ec4d56e6fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientErrorsTest.java
@@ -240,7 +240,7 @@ public class ClientErrorsTest {
         });
 
         // Create producer should succeed then upon closure, it should reattempt creation. The first request will
-        // timeout, which triggers CloseProducer. The client might send send the third Producer command before the
+        // time out, which triggers CloseProducer. The client might send the third Producer command before the
         // below assertion, so we pass with 2 or 3.
         client.newProducer().topic(topic).create();
         Awaitility.await().until(() -> closeProducerCounter.get() == 1);
@@ -249,6 +249,51 @@ public class ClientErrorsTest {
         mockBrokerService.resetHandleCloseProducer();
     }
 
+    @Test
+    public void testCreatedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+        consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/t1");
+    }
+
+    @Test
+    public void testCreatedPartitionedConsumerSendsCloseConsumerAfterTimeout() throws Exception {
+        consumerCreatedThenFailsRetryTimeout("persistent://prop/use/ns/part-t1");
+    }
+
+    private void consumerCreatedThenFailsRetryTimeout(String topic) throws Exception {
+        @Cleanup
+        PulsarClient client = PulsarClient.builder().serviceUrl(mockBrokerService.getBrokerAddress())
+                .operationTimeout(1, TimeUnit.SECONDS).build();
+        final AtomicInteger subscribeCounter = new AtomicInteger(0);
+        final AtomicInteger closeConsumerCounter = new AtomicInteger(0);
+
+        mockBrokerService.setHandleSubscribe((ctx, subscribe) -> {
+            int subscribeCount = subscribeCounter.incrementAndGet();
+            if (subscribeCount == 1) {
+                ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+                // Trigger reconnect
+                ctx.writeAndFlush(Commands.newCloseConsumer(subscribe.getConsumerId(), -1));
+            } else if (subscribeCount != 2) {
+                // Respond to subsequent requests to prevent timeouts
+                ctx.writeAndFlush(Commands.newSuccess(subscribe.getRequestId()));
+            }
+            // Don't respond to the second Subscribe command to ensure timeout
+        });
+
+        mockBrokerService.setHandleCloseConsumer((ctx, closeConsumer) -> {
+            closeConsumerCounter.incrementAndGet();
+            ctx.writeAndFlush(Commands.newSuccess(closeConsumer.getRequestId()));
+        });
+
+        // Create consumer (subscribe) should succeed then upon closure, it should reattempt creation. The first
+        // request will time out, which triggers CloseConsumer. The client might send the third Subscribe command before
+        // the below assertion, so we pass with 2 or 3.
+        client.newConsumer().topic(topic).subscriptionName("test").subscribe();
+        Awaitility.await().until(() -> closeConsumerCounter.get() == 1);
+        Awaitility.await().until(() -> subscribeCounter.get() == 2 || subscribeCounter.get() == 3);
+        mockBrokerService.resetHandleSubscribe();
+        mockBrokerService.resetHandleCloseConsumer();
+    }
+
     @Test
     public void testProducerFailDoesNotFailOtherProducer() throws Exception {
         producerFailDoesNotFailOtherProducer("persistent://prop/use/ns/t1", "persistent://prop/use/ns/t2");
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 2bef30012b4..69282f6bea6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -867,6 +867,15 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
                         subscription, cnx.channel().remoteAddress());
 
+                if (e.getCause() instanceof PulsarClientException.TimeoutException) {
+                    // Creating the consumer has timed out. We need to ensure the broker closes the consumer
+                    // in case it was indeed created, otherwise it might prevent new create consumer operation,
+                    // since we are not necessarily closing the connection.
+                    long closeRequestId = client.newRequestId();
+                    ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
+                    cnx.sendRequestWithId(cmd, closeRequestId);
+                }
+
                 if (e.getCause() instanceof PulsarClientException
                         && PulsarClientException.isRetriableError(e.getCause())
                         && System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
diff --git a/site2/docs/developing-binary-protocol.md b/site2/docs/developing-binary-protocol.md
index 3caf769ad2c..fdab6c73465 100644
--- a/site2/docs/developing-binary-protocol.md
+++ b/site2/docs/developing-binary-protocol.md
@@ -310,6 +310,10 @@ subscription is not already there, a new one will be created.
 
 ![Consumer](assets/binary-protocol-consumer.png)
 
+If the client does not receive a response indicating consumer creation success or failure,
+the client should first send a command to close the original consumer before sending a
+command to re-attempt consumer creation.
+
 #### Flow control
 
 After the consumer is ready, the client needs to *give permission* to the
@@ -419,6 +423,11 @@ Parameters:
 
 This command behaves the same as [`CloseProducer`](#command-closeproducer)
 
+If the client does not receive a response to a `Subscribe` command within a timeout,
+the client must first send a `CloseConsumer` command before sending another
+`Subscribe` command. The client does not need to await a response to the `CloseConsumer`
+command before sending the next `Subscribe` command.
+
 ##### Command RedeliverUnacknowledgedMessages
 
 A consumer can ask the broker to redeliver some or all of the pending messages