You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/02/18 17:40:11 UTC

[pulsar] branch master updated: Fix adding message to list potential issue (#14377)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b22445f  Fix adding message to list potential issue (#14377)
b22445f is described below

commit b22445f961da5cf2e7baaac4b3847007f4c6ed59
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Feb 19 01:38:24 2022 +0800

    Fix adding message to list potential issue (#14377)
---
 .../src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java  | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index e09a934..b5599c8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1907,18 +1907,17 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
             return CompletableFuture.completedFuture(Collections.emptyList());
         }
         List<MessageIdData> data = new ArrayList<>(messageIds.size());
-        List<CompletableFuture<Boolean>> futures = new ArrayList<>(messageIds.size());
+        List<CompletableFuture<Void>> futures = new ArrayList<>(messageIds.size());
         messageIds.forEach(messageId -> {
             CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
-            futures.add(future);
-            future.thenAccept(sendToDLQ -> {
+            futures.add(future.thenAccept(sendToDLQ -> {
                 if (!sendToDLQ) {
                     data.add(new MessageIdData()
                             .setPartition(messageId.getPartitionIndex())
                             .setLedgerId(messageId.getLedgerId())
                             .setEntryId(messageId.getEntryId()));
                 }
-            });
+            }));
         });
         return FutureUtil.waitForAll(futures).thenCompose(v -> CompletableFuture.completedFuture(data));
     }