You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/01 08:49:41 UTC

[GitHub] [pulsar] Shawyeok commented on a change in pull request #9102: [pulsar-client] Fix redeliverUnacknowledgedMessages may block timer thread cause serious problem

Shawyeok commented on a change in pull request #9102:
URL: https://github.com/apache/pulsar/pull/9102#discussion_r550747420



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
##########
@@ -1843,18 +1812,29 @@ private boolean processPossibleToDLQ(MessageIdImpl messageId) {
                 }
             }
             if (deadLetterProducer != null) {
-                try {
-                    for (MessageImpl<T> message : deadLetterMessages) {
-                        deadLetterProducer.newMessage()
-                                .value(message.getValue())
-                                .properties(message.getProperties())
-                                .send();
-                    }
-                    acknowledge(messageId);
-                    return true;
-                } catch (Exception e) {
-                    log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+                for (MessageImpl<T> message : deadLetterMessages) {
+                    deadLetterProducer.newMessage()
+                            .value(message.getValue())
+                            .properties(message.getProperties())
+                            .sendAsync()
+                            .whenComplete((msgId, ex) -> {
+                                if (ex != null) {
+                                    log.error("Send to dead letter topic exception with topic: {}, messageId: {}",
+                                            deadLetterProducer.getTopic(), messageId, ex);
+                                    return;
+                                }
+
+                                log.debug("Redeliver dead letter message topic: {}, messageId: {} to dead letter topic with topic: {}, messageId: {}",
+                                        message.getTopicName(), messageId, deadLetterProducer.getTopic(), msgId);
+                                try {
+                                    acknowledge(messageId);
+                                } catch (Exception e) {
+                                    log.error("Acknowledge dead letter message exception with topic: {}, messageId: {}",
+                                            message.getTopicName(), messageId);
+                                }
+                            });
                 }
+                return true;

Review comment:
       Right, can't simply return true here, but make the method `processPossibleToDLQ` asynchronous will bring many other changes to fit async.
   https://github.com/apache/pulsar/blob/7a9ec066e1cc3ca918240776c4c9f805b0a92e96/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1787-L1794
   https://github.com/apache/pulsar/blob/7a9ec066e1cc3ca918240776c4c9f805b0a92e96/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L701-L703
   
   How about re-put message into the `UnAckedMessageTracker` when sent failed? So these messages will be retry in later tick.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org