You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/18 17:40:11 UTC
[pulsar] branch master updated: Fix adding message to list potential issue (#14377)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b22445f Fix adding message to list potential issue (#14377)
b22445f is described below
commit b22445f961da5cf2e7baaac4b3847007f4c6ed59
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800
Fix adding message to list potential issue (#14377)
---
.../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java | 7 +++----
1 file changed, 3 insertions(+), 4 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e09a934..b5599c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1907,18 +1907,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<MessageIdData> data = new ArrayList<>(messageIds.size());
- List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
+ List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
messageIds.forEach(messageId -> {
CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
- futures.add(future);
- future.thenAccept(sendToDLQ -> {
+ futures.add(future.thenAccept(sendToDLQ -> {
if (!sendToDLQ) {
data.add(new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId()));
}
- });
+ }));
});
return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
}