You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/13 04:55:38 UTC

(pulsar) branch branch-3.2 updated: [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 97c639e57fb [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)
97c639e57fb is described below

commit 97c639e57fb6793d3d9709dcc3ab9fd24e3f6e7d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Mar 1 16:02:24 2024 +0800

    [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)
    
    Co-authored-by: zifengmo <38...@users.noreply.github.com>
---
 .../broker/service/BrokerServiceException.java     |  2 ++
 .../pulsar/client/api/MultiTopicsConsumerTest.java | 32 ++++++++++++++++++++++
 .../pulsar/client/api/PulsarClientException.java   | 17 ++++++++++++
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  2 ++
 4 files changed, 53 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 3e77588b245..831d6068e20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -258,6 +258,8 @@ public class BrokerServiceException extends Exception {
             return ServerError.ServiceNotReady;
         } else if (t instanceof TopicNotFoundException) {
             return ServerError.TopicNotFound;
+        } else if (t instanceof SubscriptionNotFoundException) {
+            return ServerError.SubscriptionNotFound;
         } else if (t instanceof IncompatibleSchemaException
             || t instanceof InvalidSchemaDataException) {
             // for backward compatible with old clients, invalid schema data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 315ce378d69..bb8bab29ad9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -371,4 +371,36 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
         assertTrue(consumer instanceof MultiTopicsConsumerImpl);
         assertTrue(consumer.isConnected());
     }
+
+    @Test(timeOut = 30000)
+    public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException {
+        final var topic1 = newTopicName();
+        final var topic2 = newTopicName();
+
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+        try {
+            final var singleTopicConsumer = pulsarClient.newConsumer()
+                    .topic(topic1)
+                    .subscriptionName("sub-1")
+                    .isAckReceiptEnabled(true)
+                    .subscribe();
+            assertTrue(singleTopicConsumer instanceof ConsumerImpl);
+        } catch (Throwable t) {
+            assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
+        }
+
+        try {
+            final var multiTopicsConsumer = pulsarClient.newConsumer()
+                    .topics(List.of(topic1, topic2))
+                    .subscriptionName("sub-2")
+                    .isAckReceiptEnabled(true)
+                    .subscribe();
+            assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);
+        } catch (Throwable t) {
+            assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
+        }
+
+        pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+    }
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 007308ec7ab..c460fee11d0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -344,6 +344,22 @@ public class PulsarClientException extends IOException {
         }
     }
 
+    /**
+     *  Not found subscription that cannot be created.
+     */
+    public static class SubscriptionNotFoundException extends PulsarClientException {
+        /**
+         * Constructs an {@code SubscriptionNotFoundException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public SubscriptionNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     /**
      * Lookup exception thrown by Pulsar client.
      */
@@ -1163,6 +1179,7 @@ public class PulsarClientException extends IOException {
                 || t instanceof NotFoundException
                 || t instanceof IncompatibleSchemaException
                 || t instanceof TopicDoesNotExistException
+                || t instanceof SubscriptionNotFoundException
                 || t instanceof UnsupportedAuthenticationException
                 || t instanceof InvalidMessageException
                 || t instanceof InvalidTopicNameException
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 75e84eeca3e..b3444ae393e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1336,6 +1336,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.IncompatibleSchemaException(errorMsg);
         case TopicNotFound:
             return new PulsarClientException.TopicDoesNotExistException(errorMsg);
+        case SubscriptionNotFound:
+            return new PulsarClientException.SubscriptionNotFoundException(errorMsg);
         case ConsumerAssignError:
             return new PulsarClientException.ConsumerAssignException(errorMsg);
         case NotAllowedError: