You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:11 UTC
[pulsar] 05/08: [pulsar-client] Fix multi topic reader has message available behavior (#13332)
This is an automated email from the ASF dual-hosted git repository.
yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d0eac9a2b157209f80659966649200906086a9ee
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Thu Dec 23 16:36:27 2021 +0800
[pulsar-client] Fix multi topic reader has message available behavior (#13332)
### Motivation
When we use a multi-topic reader, the `hasMessageAvailable` method might have the wrong behavior, since the multi-topics consumer receives all messages from the single-topic consumer, the single-topic consumer `hasMessageAvailable` might always be `false` (The lastDequeuedMessageId reach to the end of the queue, all message enqueue to multi-topic consumer's `incomingMessages` queue).
We should check the multi-topics consumer `incomingMessages` size > 0 when calling `hasMessageAvailable `.
### Modifications
1. Add a check of `incomingMessages` size > 0
2. Add units test `testHasMessageAvailableAsync` to verify the behavior.
(cherry picked from commit 6c7dcc0cf877cfcb8bcea18cde7662ebacb01d4c)
---
.../pulsar/client/impl/MultiTopicsReaderTest.java | 67 ++++++++++++++++++++++
.../client/impl/MultiTopicsConsumerImpl.java | 3 +
.../pulsar/client/impl/MultiTopicsReaderImpl.java | 2 +-
3 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a8a6ced..f6230e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -26,15 +26,19 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -43,6 +47,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
@@ -61,6 +66,7 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "flaky")
+@Slf4j
public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
private static final String subscription = "reader-multi-topics-sub";
@@ -122,6 +128,67 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
}
@Test(timeOut = 10000)
+ public void testHasMessageAvailableAsync() throws Exception {
+ String topic = "persistent://my-property/my-ns/testHasMessageAvailableAsync";
+ String content = "my-message-";
+ int msgNum = 10;
+ admin.topics().createPartitionedTopic(topic, 2);
+ // stop retention from cleaning up
+ pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+ try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true)
+ .startMessageId(MessageId.earliest).create()) {
+ Assert.assertFalse(reader.hasMessageAvailable());
+ Assert.assertFalse(reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS));
+ }
+
+ try (Reader<byte[]> reader = pulsarClient.newReader()
+ .topic(topic).startMessageId(MessageId.earliest).create()) {
+ try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) {
+ for (int i = 0; i < msgNum; i++) {
+ producer.newMessage().key(content + i)
+ .value((content + i).getBytes(StandardCharsets.UTF_8)).send();
+ }
+ }
+ // Should have message available
+ Assert.assertTrue(reader.hasMessageAvailableAsync().get());
+ try {
+ // Should have message available too
+ Assert.assertTrue(reader.hasMessageAvailable());
+ } catch (PulsarClientException e) {
+ fail("Expect success but failed.", e);
+ }
+ List<Message<byte[]>> msgs = Collections.synchronizedList(new ArrayList<>());
+ CountDownLatch latch = new CountDownLatch(1);
+ readMessageUseAsync(reader, msgs, latch);
+ latch.await();
+ Assert.assertEquals(msgs.size(), msgNum);
+ }
+ }
+
+ private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
+ reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
+ if (hasMessageAvailable) {
+ try {
+ Message<T> msg = reader.readNext();
+ msgs.add(msg);
+ } catch (PulsarClientException e) {
+ log.error("Read message failed.", e);
+ latch.countDown();
+ return;
+ }
+ readMessageUseAsync(reader, msgs, latch);
+ } else {
+ latch.countDown();
+ }
+ }).exceptionally(throwable -> {
+ log.error("Read message failed.", throwable);
+ latch.countDown();
+ return null;
+ });
+ }
+
+ @Test(timeOut = 10000)
public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
int topicNum = 3;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index faa2d65..177cc9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -754,6 +754,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+ if (numMessagesInQueue() > 0) {
+ return CompletableFuture.completedFuture(true);
+ }
List<CompletableFuture<Void>> futureList = new ArrayList<>();
final AtomicBoolean hasMessageAvailable = new AtomicBoolean(false);
for (ConsumerImpl<T> consumer : consumers.values()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index d25d1ba..9cab114 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -141,7 +141,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
@Override
public boolean hasMessageAvailable() throws PulsarClientException {
- return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
+ return multiTopicsConsumer.hasMessageAvailable();
}
@Override