You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/12 12:24:18 UTC

[GitHub] [pulsar] congbobo184 commented on a change in pull request #8881: [Transaction] Transaction pending ack persistent

congbobo184 commented on a change in pull request #8881:
URL: https://github.com/apache/pulsar/pull/8881#discussion_r630991154



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
##########
@@ -265,13 +265,23 @@ public void removeProducer(Producer producer) {
 
         NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
                 name -> new NonPersistentSubscription(this, subscriptionName));
-
-        try {
-            Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
-                    cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
-            addConsumerToSubscription(subscription, consumer);
+        Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
+                cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
+        addConsumerToSubscription(subscription, consumer).thenAccept(v -> {
             if (!cnx.isActive()) {
-                consumer.close();
+                try {
+                    consumer.close();
+                } catch (BrokerServiceException e) {
+                    if (e instanceof ConsumerBusyException) {
+                        log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
+                                consumerName);
+                    } else if (e instanceof SubscriptionBusyException) {
+                        log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
+                    }
+
+                    decrementUsageCount();
+                    future.completeExceptionally(e);

Review comment:
       the close is Is a synchronous method, so we should catch the exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org