You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 01:33:52 UTC

[pulsar] 16/18: Use sendAsync instead of send when produce message to retry topic. (#12946)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit ab57a6a60854ca6dc225e53323adedc3c1322716
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Nov 30 01:46:53 2021 +0800

    Use sendAsync instead of send when produce message to retry topic. (#12946)
    
    * Use sendAsync instead of send when produce message to retry letter topic.
    
    * add exception handler.
    
    (cherry picked from commit 09cc1d6aa91422c71601664dac8d94ba574beb7b)
---
 .../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1b8d19c..ff0b826 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -590,9 +590,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
 
+                MessageId finalMessageId = messageId;
                 if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
                     initDeadLetterProducerIfNeeded();
-                    MessageId finalMessageId = messageId;
                     deadLetterProducer.thenAccept(dlqProducer -> {
                         TypedMessageBuilder<byte[]> typedMessageBuilderNew =
                                 dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
@@ -624,8 +624,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     if (message.hasKey()) {
                         typedMessageBuilderNew.key(message.getKey());
                     }
-                    typedMessageBuilderNew.send();
-                    return doAcknowledge(messageId, ackType, properties, null);
+                    typedMessageBuilderNew.sendAsync()
+                            .thenAccept(__ -> doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> result.complete(null)))
+                            .exceptionally(ex -> {
+                                result.completeExceptionally(ex);
+                                return null;
+                            });
                 }
             } catch (Exception e) {
                 log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);