You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/18 15:57:10 UTC

[pulsar] 16/27: Async the DLQ process (#9552)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fcfb2b5748668aa1d95187c21f5f3d2dad7344ad
Author: lipenghui <pe...@apache.org>
AuthorDate: Thu Feb 11 09:48:37 2021 +0800

    Async the DLQ process (#9552)
    
    Fixes #9540
    
    Async the DLQ process. Currently, the DLQ process is a synchronous process. Since we process the DLQ in the timer and the timer will acquire a write lock during writing the data to the DLQ, the data writing process will use the IO thread and the messages that add to the UnAckedMessageTracker also use the IO thread and if also acquire the same write lock. So this will result in a dead lock.
    
    (cherry picked from commit fb0f3e39cf1d6eaefb58825291f090cdfe2c4904)
---
 .../apache/pulsar/client/impl/ConsumerImpl.java    | 199 +++++++++++++--------
 .../pulsar/client/impl/UnAckedMessageTracker.java  |  13 +-
 2 files changed, 129 insertions(+), 83 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index eb7d7d2..d4055b0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -37,7 +37,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -169,7 +168,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     private final DeadLetterPolicy deadLetterPolicy;
 
-    private volatile Producer<T> deadLetterProducer;
+    private volatile CompletableFuture<Producer<T>> deadLetterProducer;
 
     private volatile Producer<T> retryLetterProducer;
     private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
@@ -672,6 +671,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 createProducerLock.writeLock().unlock();
             }
         }
+        CompletableFuture<Void> result = new CompletableFuture<>();
         if (retryLetterProducer != null) {
             try {
                 MessageImpl<T> retryMessage = null;
@@ -704,33 +704,34 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumetimes));
                 propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, String.valueOf(unit.toMillis(delayTime)));
 
-               if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount()) {
-                   processPossibleToDLQ((MessageIdImpl)messageId);
-                    if (deadLetterProducer == null) {
-                        try {
-                            createProducerLock.writeLock().lock();
-                            if (deadLetterProducer == null) {
-                                deadLetterProducer = client.newProducer(schema)
-                                        .topic(this.deadLetterPolicy
-                                        .getDeadLetterTopic())
-                                        .blockIfQueueFull(false)
-                                        .create();
-                            }
-                        } catch (Exception e) {
-                           log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
-                        } finally {
-                           createProducerLock.writeLock().unlock();
-                        }
-                   }
-                   if (deadLetterProducer != null) {
-                       propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
-                       propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
-                       TypedMessageBuilder<T> typedMessageBuilderNew = deadLetterProducer.newMessage()
-                               .value(retryMessage.getValue())
-                               .properties(propertiesMap);
-                       typedMessageBuilderNew.send();
-                       return doAcknowledge(messageId, ackType, properties, null);
-                   }
+                if (reconsumetimes > this.deadLetterPolicy.getMaxRedeliverCount() && StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                    initDeadLetterProducerIfNeeded();
+                    MessageId finalMessageId = messageId;
+                    String finalOriginTopicNameStr = originTopicNameStr;
+                    String finalOriginMessageIdStr = originMessageIdStr;
+                    MessageImpl<T> finalRetryMessage = retryMessage;
+                    deadLetterProducer.thenAccept(dlqProducer -> {
+                        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, finalOriginTopicNameStr);
+                        propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, finalOriginMessageIdStr);
+                        TypedMessageBuilder<T> typedMessageBuilderNew = dlqProducer.newMessage()
+                                .value(finalRetryMessage.getValue())
+                                .properties(propertiesMap);
+                        typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
+                            doAcknowledge(finalMessageId, ackType, properties, null).thenAccept(v -> {
+                                result.complete(null);
+                            }).exceptionally(ex -> {
+                                result.completeExceptionally(ex);
+                                return null;
+                            });
+                        }).exceptionally(ex -> {
+                            result.completeExceptionally(ex);
+                            return null;
+                        });
+                    }).exceptionally(ex -> {
+                        result.completeExceptionally(ex);
+                        deadLetterProducer = null;
+                        return null;
+                    });
                 } else {
                     TypedMessageBuilder<T> typedMessageBuilderNew = retryLetterProducer.newMessage()
                             .value(retryMessage.getValue())
@@ -745,14 +746,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     return doAcknowledge(messageId, ackType, properties, null);
                 }
             } catch (Exception e) {
-                log.error("Send to retry letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
-                Set<MessageId> messageIds = new HashSet<>();
-                messageIds.add(messageId);
+                log.error("Send to retry letter topic exception with topic: {}, messageId: {}", retryLetterProducer.getTopic(), messageId, e);
+                Set<MessageId> messageIds = Collections.singleton(messageId);
                 unAckedMessageTracker.remove(messageId);
                 redeliverUnacknowledgedMessages(messageIds);
             }
         }
-        return CompletableFuture.completedFuture(null);
+        MessageId finalMessageId = messageId;
+        result.exceptionally(ex -> {
+            Set<MessageId> messageIds = Collections.singleton(finalMessageId);
+            unAckedMessageTracker.remove(finalMessageId);
+            redeliverUnacknowledgedMessages(messageIds);
+            return null;
+        });
+        return result;
     }
 
     // TODO: handle transactional acknowledgements.
@@ -1790,19 +1797,12 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     .collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
-                List<MessageIdData> messageIdDatas = ids.stream()
-                    .filter(messageId -> !processPossibleToDLQ(messageId))
-                    .map(messageId -> {
-                            builder.setPartition(messageId.getPartitionIndex());
-                            builder.setLedgerId(messageId.getLedgerId());
-                            builder.setEntryId(messageId.getEntryId());
-                            return builder.build();
-                        }).collect(Collectors.toList());
-                if (!messageIdDatas.isEmpty()) {
-                    ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdDatas);
-                    cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
-                    messageIdDatas.forEach(MessageIdData::recycle);
-                }
+                getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
+                    if (!messageIdData.isEmpty()) {
+                        ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
+                        cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
+                    }
+                });
             });
             if (messagesFromQueue > 0) {
                 increaseAvailablePermits(cnx, messagesFromQueue);
@@ -1827,48 +1827,91 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         notifyPendingBatchReceivedCallBack(op);
     }
 
-    private boolean processPossibleToDLQ(MessageIdImpl messageId) {
+    private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageIdImpl> messageIds) {
+        if (messageIds == null || messageIds.isEmpty()) {
+            return CompletableFuture.completedFuture(Collections.emptyList());
+        }
+        List<MessageIdData> data = new ArrayList<>(messageIds.size());
+        List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
+        messageIds.forEach(messageId ->  {
+            CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
+            futures.add(future);
+            future.thenAccept(sendToDLQ -> {
+                if (!sendToDLQ) {
+                    data.add(MessageIdData.newBuilder()
+                            .setPartition(messageId.getPartitionIndex())
+                            .setLedgerId(messageId.getLedgerId())
+                            .setEntryId(messageId.getEntryId()).build());
+                }
+            });
+        });
+        return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
+    }
+
+    private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId) {
         List<MessageImpl<T>> deadLetterMessages = null;
         if (possibleSendToDeadLetterTopicMessages != null) {
             if (messageId instanceof BatchMessageIdImpl) {
-                deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
-                        getPartitionIndex()));
-            } else {
-                deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId);
+                messageId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
+                        getPartitionIndex());
             }
+            deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(messageId);
         }
+        CompletableFuture<Boolean> result = new CompletableFuture<>();
         if (deadLetterMessages != null) {
-            if (deadLetterProducer == null) {
-                try {
-                    createProducerLock.writeLock().lock();
-                    if (deadLetterProducer == null) {
-                        deadLetterProducer = client.newProducer(schema)
-                                .topic(this.deadLetterPolicy.getDeadLetterTopic())
-                                .blockIfQueueFull(false)
-                                .create();
-                    }
-                } catch (Exception e) {
-                    log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
-                } finally {
-                    createProducerLock.writeLock().unlock();
+            initDeadLetterProducerIfNeeded();
+            List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
+            MessageIdImpl finalMessageId = messageId;
+            deadLetterProducer.thenAccept(producerDLQ -> {
+                for (MessageImpl<T> message : finalDeadLetterMessages) {
+                    producerDLQ.newMessage()
+                            .value(message.getValue())
+                            .properties(message.getProperties())
+                            .sendAsync()
+                            .thenAccept(messageIdInDLQ -> {
+                                possibleSendToDeadLetterTopicMessages.remove(finalMessageId);
+                                acknowledgeAsync(finalMessageId).whenComplete((v, ex) -> {
+                                    if (ex != null) {
+                                        log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original topic but send to the DLQ successfully.",
+                                                topicName, subscription, consumerName, finalMessageId, ex);
+                                    } else {
+                                        result.complete(true);
+                                    }
+                                });
+                            }).exceptionally(ex -> {
+                                log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}",
+                                        topicName, subscription, consumerName, finalMessageId, ex);
+                                result.complete(false);
+                                return null;
+                    });
                 }
-            }
-            if (deadLetterProducer != null) {
-                try {
-                    for (MessageImpl<T> message : deadLetterMessages) {
-                        deadLetterProducer.newMessage()
-                                .value(message.getValue())
-                                .properties(message.getProperties())
-                                .send();
-                    }
-                    acknowledge(messageId);
-                    return true;
-                } catch (Exception e) {
-                    log.error("Send to dead letter topic exception with topic: {}, messageId: {}", deadLetterProducer.getTopic(), messageId, e);
+            }).exceptionally(ex -> {
+                deadLetterProducer = null;
+                result.complete(false);
+                return null;
+            });
+        } else {
+            result.complete(false);
+        }
+        return result;
+    }
+
+    private void initDeadLetterProducerIfNeeded() {
+        if (deadLetterProducer == null) {
+            try {
+                createProducerLock.writeLock().lock();
+                if (deadLetterProducer == null) {
+                    deadLetterProducer = client.newProducer(schema)
+                            .topic(this.deadLetterPolicy.getDeadLetterTopic())
+                            .blockIfQueueFull(false)
+                            .createAsync();
                 }
+            } catch (Exception e) {
+                log.error("Create dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), e);
+            } finally {
+                createProducerLock.writeLock().unlock();
             }
         }
-        return false;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index e1a9bf9..f41ffcd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -139,12 +139,15 @@ public class UnAckedMessageTracker implements Closeable {
                     headPartition.clear();
                     timePartitions.addLast(headPartition);
                 } finally {
-                    if (messageIds.size() > 0) {
-                        consumerBase.onAckTimeoutSend(messageIds);
-                        consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                    try {
+                        if (messageIds.size() > 0) {
+                            consumerBase.onAckTimeoutSend(messageIds);
+                            consumerBase.redeliverUnacknowledgedMessages(messageIds);
+                        }
+                        timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
+                    } finally {
+                        writeLock.unlock();
                     }
-                    timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
-                    writeLock.unlock();
                 }
             }
         }, this.tickDurationInMs, TimeUnit.MILLISECONDS);