You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/21 01:33:52 UTC
[pulsar] 16/18: Use sendAsync instead of send when produce message to retry topic. (#12946)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ab57a6a60854ca6dc225e53323adedc3c1322716
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Tue Nov 30 01:46:53 2021 +0800
Use sendAsync instead of send when produce message to retry topic. (#12946)
* Use sendAsync instead of send when produce message to retry letter topic.
* add exception handler.
(cherry picked from commit 09cc1d6aa91422c71601664dac8d94ba574beb7b)
---
.../main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 1b8d19c..ff0b826 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -590,9 +590,9 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
+ MessageId finalMessageId = messageId;
if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
- MessageId finalMessageId = messageId;
deadLetterProducer.thenAccept(dlqProducer -> {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
@@ -624,8 +624,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
- typedMessageBuilderNew.send();
- return doAcknowledge(messageId, ackType, properties, null);
+ typedMessageBuilderNew.sendAsync()
+ .thenAccept(__ -> doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> result.complete(null)))
+ .exceptionally(ex -> {
+ result.completeExceptionally(ex);
+ return null;
+ });
}
} catch (Exception e) {
log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);