You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/25 04:07:49 UTC

[pulsar] branch branch-2.10 updated: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new bbccb62d9d0 [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)
bbccb62d9d0 is described below

commit bbccb62d9d0a0176520bf9ada5e13c251366abc3
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Sat Sep 24 21:07:44 2022 -0700

    [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 24 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewTest.java   |  1 -
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  | 18 +++++++++++++---
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 22 ++++++++++++++------
 4 files changed, 55 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 6b6bf959483..edb5f0cd88d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -625,6 +626,29 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    void shouldSupportCancellingReadNextAsync() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsReaderImpl<byte[]> reader = (MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+        // given
+        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        Awaitility.await().untilAsserted(() -> {
+            AssertJUnit.assertTrue(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+        });
+
+        // when
+        future.cancel(false);
+
+        // then
+        AssertJUnit.assertFalse(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+    }
+
+
     private void testReadMessages(String topic, boolean enableBatch) throws Exception {
         int numKeys = 9;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index de56b1a5e99..d91248056bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -222,7 +222,6 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         assertEquals(tv.get("key2"), "value2");
     }
 
-
     @Test(timeOut = 30 * 1000)
     // Regression test for making sure partition changes are always periodically checked even after a check returned
     // exceptionally.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 7d22d9d9619..e4f4d448329 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -18,12 +18,12 @@
  */
 package org.apache.pulsar.client.impl;
 
-
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
@@ -40,7 +40,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
+@Slf4j
 public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     private final MultiTopicsConsumerImpl<T> multiTopicsConsumer;
@@ -128,10 +130,20 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        return multiTopicsConsumer.receiveAsync().thenApply(msg -> {
-            multiTopicsConsumer.acknowledgeCumulativeAsync(msg);
+        CompletableFuture<Message<T>> originalFuture = multiTopicsConsumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+            multiTopicsConsumer.acknowledgeCumulativeAsync(msg)
+                    .exceptionally(ex -> {
+                        log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+                                getMultiTopicsConsumer().getSubscription(), msg.getMessageId(), ex);
+                        return null;
+                    });
             return msg;
         });
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 25776582bbf..d94084a4e91 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,6 +23,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.BatchReceivePolicy;
@@ -41,7 +42,9 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
+@Slf4j
 public class ReaderImpl<T> implements Reader<T> {
     private static final BatchReceivePolicy DISABLED_BATCH_RECEIVE_POLICY = BatchReceivePolicy.builder()
             .timeout(0, TimeUnit.MILLISECONDS)
@@ -157,13 +160,20 @@ public class ReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg);
-           }
+        CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+            consumer.acknowledgeCumulativeAsync(msg)
+                    .exceptionally(ex -> {
+                        log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+                                getConsumer().getSubscription(), msg.getMessageId(), ex);
+                        return null;
+                    });
+            return msg;
         });
-        return receiveFuture;
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override