You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by yo...@apache.org on 2021/12/23 14:41:11 UTC

[pulsar] 05/08: [pulsar-client] Fix multi topic reader has message available behavior (#13332)

This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit d0eac9a2b157209f80659966649200906086a9ee
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Thu Dec 23 16:36:27 2021 +0800

    [pulsar-client] Fix multi topic reader has message available behavior (#13332)
    
    ### Motivation
    
    When we use a multi-topic reader, the `hasMessageAvailable` method might have the wrong behavior, since the multi-topics consumer receives all messages from the single-topic consumer, the single-topic consumer `hasMessageAvailable` might always be `false` (The lastDequeuedMessageId reach to the end of the queue, all message enqueue to multi-topic consumer's `incomingMessages` queue).
    
    We should check the multi-topics consumer  `incomingMessages` size > 0 when calling `hasMessageAvailable `.
    
    ### Modifications
    
    1. Add a check of `incomingMessages` size > 0
    2. Add units test `testHasMessageAvailableAsync` to verify the behavior.
    
    (cherry picked from commit 6c7dcc0cf877cfcb8bcea18cde7662ebacb01d4c)
---
 .../pulsar/client/impl/MultiTopicsReaderTest.java  | 67 ++++++++++++++++++++++
 .../client/impl/MultiTopicsConsumerImpl.java       |  3 +
 .../pulsar/client/impl/MultiTopicsReaderImpl.java  |  2 +-
 3 files changed, 71 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
index a8a6ced..f6230e2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MultiTopicsReaderTest.java
@@ -26,15 +26,19 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -43,6 +47,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Range;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -61,6 +66,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "flaky")
+@Slf4j
 public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
 
     private static final String subscription = "reader-multi-topics-sub";
@@ -122,6 +128,67 @@ public class MultiTopicsReaderTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test(timeOut = 10000)
+    public void testHasMessageAvailableAsync() throws Exception {
+        String topic = "persistent://my-property/my-ns/testHasMessageAvailableAsync";
+        String content = "my-message-";
+        int msgNum = 10;
+        admin.topics().createPartitionedTopic(topic, 2);
+        // stop retention from cleaning up
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").subscribe().close();
+
+        try (Reader<byte[]> reader = pulsarClient.newReader().topic(topic).readCompacted(true)
+                .startMessageId(MessageId.earliest).create()) {
+            Assert.assertFalse(reader.hasMessageAvailable());
+            Assert.assertFalse(reader.hasMessageAvailableAsync().get(10, TimeUnit.SECONDS));
+        }
+
+        try (Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic).startMessageId(MessageId.earliest).create()) {
+            try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create()) {
+                for (int i = 0; i < msgNum; i++) {
+                    producer.newMessage().key(content + i)
+                            .value((content + i).getBytes(StandardCharsets.UTF_8)).send();
+                }
+            }
+            // Should have message available
+            Assert.assertTrue(reader.hasMessageAvailableAsync().get());
+            try {
+                // Should have message available too
+                Assert.assertTrue(reader.hasMessageAvailable());
+            } catch (PulsarClientException e) {
+                fail("Expect success but failed.", e);
+            }
+            List<Message<byte[]>> msgs = Collections.synchronizedList(new ArrayList<>());
+            CountDownLatch latch = new CountDownLatch(1);
+            readMessageUseAsync(reader, msgs, latch);
+            latch.await();
+            Assert.assertEquals(msgs.size(), msgNum);
+        }
+    }
+
+    private static <T> void readMessageUseAsync(Reader<T> reader, List<Message<T>> msgs, CountDownLatch latch) {
+        reader.hasMessageAvailableAsync().thenAccept(hasMessageAvailable -> {
+            if (hasMessageAvailable) {
+                try {
+                    Message<T> msg = reader.readNext();
+                    msgs.add(msg);
+                } catch (PulsarClientException e) {
+                    log.error("Read message failed.", e);
+                    latch.countDown();
+                    return;
+                }
+                readMessageUseAsync(reader, msgs, latch);
+            } else {
+                latch.countDown();
+            }
+        }).exceptionally(throwable -> {
+            log.error("Read message failed.", throwable);
+            latch.countDown();
+            return null;
+        });
+    }
+
+    @Test(timeOut = 10000)
     public void testReadMessageWithBatchingWithMessageInclusive() throws Exception {
         String topic = "persistent://my-property/my-ns/my-reader-topic-with-batching-inclusive" + UUID.randomUUID();
         int topicNum = 3;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index faa2d65..177cc9f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -754,6 +754,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     public CompletableFuture<Boolean> hasMessageAvailableAsync() {
+        if (numMessagesInQueue() > 0) {
+            return CompletableFuture.completedFuture(true);
+        }
         List<CompletableFuture<Void>> futureList = new ArrayList<>();
         final AtomicBoolean hasMessageAvailable = new AtomicBoolean(false);
         for (ConsumerImpl<T> consumer : consumers.values()) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
index d25d1ba..9cab114 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
@@ -141,7 +141,7 @@ public class MultiTopicsReaderImpl<T> implements Reader<T> {
 
     @Override
     public boolean hasMessageAvailable() throws PulsarClientException {
-        return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
+        return multiTopicsConsumer.hasMessageAvailable();
     }
 
     @Override