You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/09/25 04:07:49 UTC
[pulsar] branch branch-2.10 updated: [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new bbccb62d9d0 [fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)
bbccb62d9d0 is described below
commit bbccb62d9d0a0176520bf9ada5e13c251366abc3
Author: Heesung Sohn <10...@users.noreply.github.com>
AuthorDate: Sat Sep 24 21:07:44 2022 -0700
[fix][tableview] fixed ack failure in ReaderImpl due to null messageId (#17728) (#17828)
---
.../pulsar/client/impl/MultiTopicsReaderTest.java | 24 ++++++++++++++++++++++
.../apache/pulsar/client/impl/TableViewTest.java | 1 -
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 18 +++++++++++++---
.../org/apache/pulsar/client/impl/ReaderImpl.java | 22 ++++++++++++++------
4 files changed, 55 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index 6b6bf959483..edb5f0cd88d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
import org.testng.Assert;
+import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -625,6 +626,29 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
}
+ @Test
+ void shouldSupportCancellingReadNextAsync() throws Exception {
+ String topic = "persistent://my-property/my-ns/my-reader-topic" + UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 3);
+ MultiTopicsReaderImpl<byte[]> reader = (MultiTopicsReaderImpl<byte[]>) pulsarClient.newReader()
+ .topic(topic)
+ .startMessageId(MessageId.earliest)
+ .readerName(subscription)
+ .create();
+ // given
+ CompletableFuture<Message<byte[]>> future = reader.readNextAsync();
+ Awaitility.await().untilAsserted(() -> {
+ AssertJUnit.assertTrue(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+ });
+
+ // when
+ future.cancel(false);
+
+ // then
+ AssertJUnit.assertFalse(reader.getMultiTopicsConsumer().hasNextPendingReceive());
+ }
+
+
private void testReadMessages(String topic, boolean enableBatch) throws Exception {
int numKeys = 9;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index de56b1a5e99..d91248056bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -222,7 +222,6 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
assertEquals(tv.get("key2"), "value2");
}
-
@Test(timeOut = 30 * 1000)
// Regression test for making sure partition changes are always periodically checked even after a check returned
// exceptionally.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index 7d22d9d9619..e4f4d448329 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -18,12 +18,12 @@
*/
package org.apache.pulsar.client.impl;
-
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
@@ -40,7 +40,9 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+@Slf4j
public class MultiTopicsReaderImpl<T> implements Reader<T> {
private final MultiTopicsConsumerImpl<T> multiTopicsConsumer;
@@ -128,10 +130,20 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
@Override
public CompletableFuture<Message<T>> readNextAsync() {
- return multiTopicsConsumer.receiveAsync().thenApply(msg -> {
- multiTopicsConsumer.acknowledgeCumulativeAsync(msg);
+ CompletableFuture<Message<T>> originalFuture = multiTopicsConsumer.receiveAsync();
+ CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+ multiTopicsConsumer.acknowledgeCumulativeAsync(msg)
+ .exceptionally(ex -> {
+ log.warn("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+ getMultiTopicsConsumer().getSubscription(), msg.getMessageId(), ex);
+ return null;
+ });
return msg;
});
+ CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+ handler.attachToFuture(result);
+ handler.setCancelAction(() -> originalFuture.cancel(false));
+ return result;
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index 25776582bbf..d94084a4e91 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,6 +23,7 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatchReceivePolicy;
@@ -41,7 +42,9 @@ import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
+@Slf4j
public class ReaderImpl<T> implements Reader<T> {
private static final BatchReceivePolicy DISABLED_BATCH_RECEIVE_POLICY = BatchReceivePolicy.builder()
.timeout(0, TimeUnit.MILLISECONDS)
@@ -157,13 +160,20 @@ public class ReaderImpl<T> implements Reader<T> {
@Override
public CompletableFuture<Message<T>> readNextAsync() {
- CompletableFuture<Message<T>> receiveFuture = consumer.receiveAsync();
- receiveFuture.whenComplete((msg, t) -> {
- if (msg != null) {
- consumer.acknowledgeCumulativeAsync(msg);
- }
+ CompletableFuture<Message<T>> originalFuture = consumer.receiveAsync();
+ CompletableFuture<Message<T>> result = originalFuture.thenApply(msg -> {
+ consumer.acknowledgeCumulativeAsync(msg)
+ .exceptionally(ex -> {
+ log.error("[{}][{}] acknowledge message {} cumulative fail.", getTopic(),
+ getConsumer().getSubscription(), msg.getMessageId(), ex);
+ return null;
+ });
+ return msg;
});
- return receiveFuture;
+ CompletableFutureCancellationHandler handler = new CompletableFutureCancellationHandler();
+ handler.attachToFuture(result);
+ handler.setCancelAction(() -> originalFuture.cancel(false));
+ return result;
}
@Override