You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/18 07:59:32 UTC
[pulsar] 10/16: Fix race condition in consumer redelivery (#14687)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 878a5527f74cf3cca29a371c6bfb5cb88169cabd
Author: 萧易客 <km...@live.com>
AuthorDate: Tue Mar 15 17:15:56 2022 +0800
Fix race condition in consumer redelivery (#14687)
(cherry picked from commit dd9bcbe3c00d20e6c3aa63ef5c5235f88659a88c)
---
.../org/apache/pulsar/client/impl/ConsumerImpl.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 856a775..d19b980 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1770,20 +1770,20 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
if (messageIds == null || messageIds.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
- List<MessageIdData> data = new ArrayList<>(messageIds.size());
- List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
- messageIds.forEach(messageId -> {
+ List<CompletableFuture<MessageIdData>> futures = messageIds.stream().map(messageId -> {
CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
- futures.add(future.thenAccept(sendToDLQ -> {
+ return future.thenApply(sendToDLQ -> {
if (!sendToDLQ) {
- data.add(new MessageIdData()
+ return new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
- .setEntryId(messageId.getEntryId()));
+ .setEntryId(messageId.getEntryId());
}
- }));
- });
- return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
+ return null;
+ });
+ }).collect(Collectors.toList());
+ return FutureUtil.waitForAll(futures).thenApply(v ->
+ futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
}
private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdImpl messageId) {