You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:42:43 UTC

[pulsar] 01/02: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e333cf97a702e3e17f2467c1b68db667d3aea131
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Thu Sep 22 03:29:14 2022 -0700

    [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 24 +++++++++
 .../apache/pulsar/client/impl/TableViewTest.java   | 60 ++++++++++++++++++++++
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  8 ++-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  | 24 +++++----
 4 files changed, 105 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 6b6bf959483..edb5f0cd88d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.awaitility.Awaitility;
 import org.testng.Assert;
+import org.testng.AssertJUnit;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -625,6 +626,29 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     }
 
+    @Test
+    void shouldSupportCancellingReadNextAsync() throws Exception {
+        String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 3);
+        MultiTopicsReaderImpl<byte[]> reader = (MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .readerName(subscription)
+                .create();
+        // given
+        CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+        Awaitility.await().untilAsserted(() -> {
+            AssertJUnit.assertTrue(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+        });
+
+        // when
+        future.cancel(false);
+
+        // then
+        AssertJUnit.assertFalse(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+    }
+
+
     private void testReadMessages(String topic, boolean enableBatch) throws Exception {
         int numKeys = 9;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 20f510e97e2..8722f649212 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.fail;
 
@@ -29,7 +33,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -42,6 +48,7 @@ import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 /**
@@ -217,4 +224,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         assertEquals(tv1.size(), 1);
         assertEquals(tv.get("key2"), "value2");
     }
+
+    @DataProvider(name = "partitionedTopic")
+    public static Object[][] partitioned() {
+        return new Object[][] {{true}, {false}};
+    }
+
+    @Test(timeOut = 30 * 1000, dataProvider = "partitionedTopic")
+    public void testAck(boolean partitionedTopic) throws Exception {
+        String topic = null;
+        if (partitionedTopic) {
+            topic = "persistent://public/default/tableview-ack-test";
+            admin.topics().createPartitionedTopic(topic, 3);
+        } else {
+            topic = "persistent://public/default/tableview-no-partition-ack-test";
+            admin.topics().createNonPartitionedTopic(topic);
+        }
+
+        @Cleanup
+        TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+                .topic(topic)
+                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+                .create();
+
+        ConsumerBase consumerBase;
+        if (partitionedTopic) {
+            MultiTopicsReaderImpl<String> reader =
+                    ((CompletableFuture<MultiTopicsReaderImpl<String>>) FieldUtils
+                            .readDeclaredField(tv1, "reader", true)).get();
+            consumerBase = spy(reader.getMultiTopicsConsumer());
+            FieldUtils.writeDeclaredField(reader, "multiTopicsConsumer", consumerBase, true);
+        } else {
+            ReaderImpl<String> reader = ((CompletableFuture<ReaderImpl<String>>) FieldUtils
+                    .readDeclaredField(tv1, "reader", true)).get();
+            consumerBase = spy(reader.getConsumer());
+            FieldUtils.writeDeclaredField(reader, "consumer", consumerBase, true);
+        }
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+        int msgCount = 20;
+        for (int i = 0; i < msgCount; i++) {
+            producer.newMessage().key("key:" + i).value("value" + i).send();
+        }
+
+        Awaitility.await()
+                .pollInterval(1, TimeUnit.SECONDS)
+                .atMost(Duration.ofMillis(5000))
+                .untilAsserted(()
+                        -> verify(consumerBase, times(msgCount)).acknowledgeCumulativeAsync(any(MessageId.class)));
+
+
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 3ec95386cb8..cbb921ed9d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
 @Slf4j
 public class MultiTopicsReaderImpl<T> implements Reader<T> {
@@ -146,7 +147,8 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        return multiTopicsConsumer.receiveAsync().thenApply(msg -> {
+        CompletableFuture<Message<T>> originalFuture = multiTopicsConsumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
             multiTopicsConsumer.acknowledgeCumulativeAsync(msg)
                     .exceptionally(ex -> {
                         log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
@@ -155,6 +157,10 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
                     });
             return msg;
         });
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index c4b5263736e..04f8706f21c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
 
 @Slf4j
 public class ReaderImpl<T> implements Reader<T> {
@@ -177,17 +178,20 @@ public class ReaderImpl<T> implements Reader<T> {
 
     @Override
     public CompletableFuture<Message<T>> readNextAsync() {
-        CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
-        receiveFuture.whenComplete((msg, t) -> {
-           if (msg != null) {
-               consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
-                   log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
-                           getConsumer().getSubscription(), msg.getMessageId(), ex);
-                   return null;
-               });
-           }
+        CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+        CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+            consumer.acknowledgeCumulativeAsync(msg)
+                    .exceptionally(ex -> {
+                        log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+                                getConsumer().getSubscription(), msg.getMessageId(), ex);
+                        return null;
+                    });
+            return msg;
         });
-        return receiveFuture;
+        CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+        handler.attachToFuture(result);
+        handler.setCancelAction(() -> originalFuture.cancel(false));
+        return result;
     }
 
     @Override