You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:16 UTC
[pulsar] 30/38: [Broker] Handle all exceptions from
`topic.addProducer` (#6881)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1856f2f8a41f092d5758f88f230c4a3b32e59f85
Author: Addison Higham <ad...@gmail.com>
AuthorDate: Wed May 6 01:15:19 2020 -0600
[Broker] Handle all exceptions from `topic.addProducer` (#6881)
Fixes #6872
Fixes #6416
If a producer tries to create a producer to a topic that is currently
unloading, we can get a `RuntimeException` from
`BrokerService.checkTopicNsOwnership` which is bubbled up through
`topic.addProducer`. By only handling a `BrokerServiceException` this
results in a future that never completes and results in producers not
being able to be created if this topic is scheduled back to this broker.
(cherry picked from commit 30e26f84fbd35c2f88665195c426a73336a6fc1f)
---
.../src/main/java/org/apache/pulsar/broker/service/ServerCnx.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index ce8411e..db61f8d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -993,7 +993,7 @@ public class ServerCnx extends PulsarHandler {
ServerError error = null;
if(!existingProducerFuture.isDone()) {
error = ServerError.ServiceNotReady;
- }else {
+ } else {
error = getErrorCode(existingProducerFuture);
// remove producer with producerId as it's already completed with exception
producers.remove(producerId);
@@ -1077,7 +1077,7 @@ public class ServerCnx extends PulsarHandler {
producerFuture.completeExceptionally(
new IllegalStateException("Producer created after connection was closed"));
}
- } catch (BrokerServiceException ise) {
+ } catch (Exception ise) {
log.error("[{}] Failed to add producer to topic {}: {}", remoteAddress, topicName,
ise.getMessage());
ctx.writeAndFlush(Commands.newError(requestId,