You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2024/03/13 04:55:38 UTC
(pulsar) branch branch-3.2 updated: [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 97c639e57fb [fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)
97c639e57fb is described below
commit 97c639e57fb6793d3d9709dcc3ab9fd24e3f6e7d
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Mar 1 16:02:24 2024 +0800
[fix] [client] Do no retrying for error subscription not found when disabled allowAutoSubscriptionCreation (#22164)
Co-authored-by: zifengmo <38...@users.noreply.github.com>
---
.../broker/service/BrokerServiceException.java | 2 ++
.../pulsar/client/api/MultiTopicsConsumerTest.java | 32 ++++++++++++++++++++++
.../pulsar/client/api/PulsarClientException.java | 17 ++++++++++++
.../org/apache/pulsar/client/impl/ClientCnx.java | 2 ++
4 files changed, 53 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 3e77588b245..831d6068e20 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -258,6 +258,8 @@ public class BrokerServiceException extends Exception {
return ServerError.ServiceNotReady;
} else if (t instanceof TopicNotFoundException) {
return ServerError.TopicNotFound;
+ } else if (t instanceof SubscriptionNotFoundException) {
+ return ServerError.SubscriptionNotFound;
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
index 315ce378d69..bb8bab29ad9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java
@@ -371,4 +371,36 @@ public class MultiTopicsConsumerTest extends ProducerConsumerBase {
assertTrue(consumer instanceof MultiTopicsConsumerImpl);
assertTrue(consumer.isConnected());
}
+
+ @Test(timeOut = 30000)
+ public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClientException {
+ final var topic1 = newTopicName();
+ final var topic2 = newTopicName();
+
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
+
+ try {
+ final var singleTopicConsumer = pulsarClient.newConsumer()
+ .topic(topic1)
+ .subscriptionName("sub-1")
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ assertTrue(singleTopicConsumer instanceof ConsumerImpl);
+ } catch (Throwable t) {
+ assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
+ }
+
+ try {
+ final var multiTopicsConsumer = pulsarClient.newConsumer()
+ .topics(List.of(topic1, topic2))
+ .subscriptionName("sub-2")
+ .isAckReceiptEnabled(true)
+ .subscribe();
+ assertTrue(multiTopicsConsumer instanceof MultiTopicsConsumerImpl);
+ } catch (Throwable t) {
+ assertTrue(t.getCause().getCause() instanceof PulsarClientException.SubscriptionNotFoundException);
+ }
+
+ pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
+ }
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index 007308ec7ab..c460fee11d0 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -344,6 +344,22 @@ public class PulsarClientException extends IOException {
}
}
+ /**
+ * Not found subscription that cannot be created.
+ */
+ public static class SubscriptionNotFoundException extends PulsarClientException {
+ /**
+ * Constructs an {@code SubscriptionNotFoundException} with the specified detail message.
+ *
+ * @param msg
+ * The detail message (which is saved for later retrieval
+ * by the {@link #getMessage()} method)
+ */
+ public SubscriptionNotFoundException(String msg) {
+ super(msg);
+ }
+ }
+
/**
* Lookup exception thrown by Pulsar client.
*/
@@ -1163,6 +1179,7 @@ public class PulsarClientException extends IOException {
|| t instanceof NotFoundException
|| t instanceof IncompatibleSchemaException
|| t instanceof TopicDoesNotExistException
+ || t instanceof SubscriptionNotFoundException
|| t instanceof UnsupportedAuthenticationException
|| t instanceof InvalidMessageException
|| t instanceof InvalidTopicNameException
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 75e84eeca3e..b3444ae393e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -1336,6 +1336,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case TopicNotFound:
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
+ case SubscriptionNotFound:
+ return new PulsarClientException.SubscriptionNotFoundException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
case NotAllowedError: