You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/09/26 01:42:43 UTC
[pulsar] 01/02: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e333cf97a702e3e17f2467c1b68db667d3aea131
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Thu Sep 22 03:29:14 2022 -0700
[fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728)
---
.../pulsar/client/impl/MultiTopicsReaderTest.java | 24 +++++++++
.../apache/pulsar/client/impl/TableViewTest.java | 60 ++++++++++++++++++++++
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 8 ++-
.../org/apache/pulsar/client/impl/ReaderImpl.java | 24 +++++----
4 files changed, 105 insertions(+), 11 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 6b6bf959483..edb5f0cd88d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -625,6 +626,29 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
}
+ @Test
+ void shouldSupportCancellingReadNextAsync() throws Exception {
+ String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 3);
+ MultiTopicsReaderImpl<byte[]> reader = (MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .readerName(subscription)
+ .create();
+ // given
+ CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+ Awaitility.await().untilAsserted(() -> {
+ AssertJUnit.assertTrue(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+ });
+
+ // when
+ future.cancel(false);
+
+ // then
+ AssertJUnit.assertFalse(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+ }
+
+
private void testReadMessages(String topic, boolean enableBatch) throws Exception {
int numKeys = 9;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 20f510e97e2..8722f649212 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
@@ -29,7 +33,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
@@ -42,6 +48,7 @@ import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
/**
@@ -217,4 +224,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
assertEquals(tv1.size(), 1);
assertEquals(tv.get("key2"), "value2");
}
+
+ @DataProvider(name = "partitionedTopic")
+ public static Object[][] partitioned() {
+ return new Object[][] {{true}, {false}};
+ }
+
+ @Test(timeOut = 30 * 1000, dataProvider = "partitionedTopic")
+ public void testAck(boolean partitionedTopic) throws Exception {
+ String topic = null;
+ if (partitionedTopic) {
+ topic = "persistent://public/default/tableview-ack-test";
+ admin.topics().createPartitionedTopic(topic, 3);
+ } else {
+ topic = "persistent://public/default/tableview-no-partition-ack-test";
+ admin.topics().createNonPartitionedTopic(topic);
+ }
+
+ @Cleanup
+ TableView<String> tv1 = pulsarClient.newTableViewBuilder(Schema.STRING)
+ .topic(topic)
+ .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ .create();
+
+ ConsumerBase consumerBase;
+ if (partitionedTopic) {
+ MultiTopicsReaderImpl<String> reader =
+ ((CompletableFuture<MultiTopicsReaderImpl<String>>) FieldUtils
+ .readDeclaredField(tv1, "reader", true)).get();
+ consumerBase = spy(reader.getMultiTopicsConsumer());
+ FieldUtils.writeDeclaredField(reader, "multiTopicsConsumer", consumerBase, true);
+ } else {
+ ReaderImpl<String> reader = ((CompletableFuture<ReaderImpl<String>>) FieldUtils
+ .readDeclaredField(tv1, "reader", true)).get();
+ consumerBase = spy(reader.getConsumer());
+ FieldUtils.writeDeclaredField(reader, "consumer", consumerBase, true);
+ }
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
+
+ int msgCount = 20;
+ for (int i = 0; i < msgCount; i++) {
+ producer.newMessage().key("key:" + i).value("value" + i).send();
+ }
+
+ Awaitility.await()
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(Duration.ofMillis(5000))
+ .untilAsserted(()
+ -> verify(consumerBase, times(msgCount)).acknowledgeCumulativeAsync(any(MessageId.class)));
+
+
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 3ec95386cb8..cbb921ed9d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
@Slf4j
public class MultiTopicsReaderImpl<T> implements Reader<T> {
@@ -146,7 +147,8 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
@Override
public CompletableFuture<Message<T>> readNextAsync() {
- return multiTopicsConsumer.receiveAsync().thenApply(msg -> {
+ CompletableFuture<Message<T>> originalFuture = multiTopicsConsumer.receiveAsync();
+ CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
multiTopicsConsumer.acknowledgeCumulativeAsync(msg)
.exceptionally(ex -> {
log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
@@ -155,6 +157,10 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
});
return msg;
});
+ CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+ handler.attachToFuture(result);
+ handler.setCancelAction(() -> originalFuture.cancel(false));
+ return result;
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index c4b5263736e..04f8706f21c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -42,6 +42,7 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
@Slf4j
public class ReaderImpl<T> implements Reader<T> {
@@ -177,17 +178,20 @@ public class ReaderImpl<T> implements Reader<T> {
@Override
public CompletableFuture<Message<T>> readNextAsync() {
- CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
- receiveFuture.whenComplete((msg, t) -> {
- if (msg != null) {
- consumer.acknowledgeCumulativeAsync(msg).exceptionally(ex -> {
- log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
- getConsumer().getSubscription(), msg.getMessageId(), ex);
- return null;
- });
- }
+ CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+ CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+ consumer.acknowledgeCumulativeAsync(msg)
+ .exceptionally(ex -> {
+ log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+ getConsumer().getSubscription(), msg.getMessageId(), ex);
+ return null;
+ });
+ return msg;
});
- return receiveFuture;
+ CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+ handler.attachToFuture(result);
+ handler.setCancelAction(() -> originalFuture.cancel(false));
+ return result;
}
@Override