You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/10 11:09:52 UTC

(pulsar) branch branch-3.0 updated: [fix][client] Avert extensive time consumption during table view construction (#21270)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9de0a88d106 [fix][client] Avert extensive time consumption during table view construction (#21270)
9de0a88d106 is described below

commit 9de0a88d10617236f0114b8537316aafef50e08e
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Nov 6 15:55:07 2023 +0800

    [fix][client] Avert extensive time consumption during table view construction (#21270)
    
    Reopen https://github.com/apache/pulsar/pull/21170
    ### Motivation
    If a topic persistently experiences a substantial quantity of data inputs,  the act of reading all the messages present in this topic to build a TableView can take an excessive amount of time.
    ### Modification
    In the process of constructing the TableView, initially, the last message ID of the current topic is procured. Consequently, once this last message ID has been reached, the creation ensues to its completion.
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 60 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/TableViewImpl.java   | 28 ++++++++--
 2 files changed, 84 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 6c6da5870ae..523360884c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -20,16 +20,21 @@ package org.apache.pulsar.client.impl;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
 import java.time.Duration;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -39,6 +44,7 @@ import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
@@ -46,6 +52,7 @@ import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -438,4 +445,57 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         });
         verify(consumer, times(msgCnt)).receiveAsync();
     }
+
+    @Test
+    public void testBuildTableViewWithMessagesAlwaysAvailable() throws Exception {
+        String topic = "persistent://public/default/testBuildTableViewWithMessagesAlwaysAvailable";
+        admin.topics().createPartitionedTopic(topic, 10);
+        @Cleanup
+        Reader<byte[]> reader = pulsarClient.newReader()
+                .topic(topic)
+                .startMessageId(MessageId.earliest)
+                .create();
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topic)
+                .create();
+        // Prepare real data to do test.
+        for (int i = 0; i < 1000; i++) {
+            producer.newMessage().send();
+        }
+        List<TopicMessageId> lastMessageIds = reader.getLastMessageIds();
+
+        // Use mock reader to build tableview. In the old implementation, the readAllExistingMessages method
+        // will not be completed because the `mockReader.hasMessageAvailable()` always return ture.
+        Reader<byte[]> mockReader = spy(reader);
+        when(mockReader.hasMessageAvailable()).thenReturn(true);
+        when(mockReader.getLastMessageIdsAsync()).thenReturn(CompletableFuture.completedFuture(lastMessageIds));
+        AtomicInteger index = new AtomicInteger(lastMessageIds.size());
+        when(mockReader.readNextAsync()).thenAnswer(invocation -> {
+            Message<byte[]> message = spy(Message.class);
+            int localIndex = index.decrementAndGet();
+            if (localIndex >= 0) {
+                when(message.getTopicName()).thenReturn(lastMessageIds.get(localIndex).getOwnerTopic());
+                when(message.getMessageId()).thenReturn(lastMessageIds.get(localIndex));
+                when(message.hasKey()).thenReturn(false);
+                doNothing().when(message).release();
+            }
+            return CompletableFuture.completedFuture(message);
+        });
+        @Cleanup
+        TableViewImpl<byte[]> tableView = (TableViewImpl<byte[]>) pulsarClient.newTableView()
+                .topic(topic)
+                .createAsync()
+                .get();
+        TableViewImpl<byte[]> mockTableView = spy(tableView);
+        Method readAllExistingMessagesMethod = TableViewImpl.class
+                .getDeclaredMethod("readAllExistingMessages", Reader.class);
+        readAllExistingMessagesMethod.setAccessible(true);
+        CompletableFuture<Reader<?>> future =
+                (CompletableFuture<Reader<?>>) readAllExistingMessagesMethod.invoke(mockTableView, mockReader);
+
+        // The future will complete after receive all the messages from lastMessageIds.
+        future.get(3, TimeUnit.SECONDS);
+        assertTrue(index.get() <= 0);
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 560636f9462..151c96d96aa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.ReaderBuilder;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.client.api.TopicMessageId;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.topics.TopicCompactionStrategy;
 
@@ -235,20 +236,40 @@ public class TableViewImpl<T> implements TableView<T> {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new ConcurrentHashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
+            });
+            readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
+        }).exceptionally(ex -> {
+            future.completeExceptionally(ex);
+            return null;
+        });
+        future.thenAccept(__ -> readTailMessages(reader));
         return future;
     }
 
     private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
-                                         AtomicLong messagesRead) {
+                                         AtomicLong messagesRead, Map<String, TopicMessageId> maxMessageIds) {
         reader.hasMessageAvailableAsync()
                 .thenAccept(hasMessage -> {
                    if (hasMessage) {
                        reader.readNextAsync()
                                .thenAccept(msg -> {
                                   messagesRead.incrementAndGet();
+                                  // We need remove the partition from the maxMessageIds map
+                                  // once the partition has been read completely.
+                                  TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName());
+                                  if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) {
+                                      maxMessageIds.remove(msg.getTopicName());
+                                  }
                                   handleMessage(msg);
-                                  readAllExistingMessages(reader, future, startTime, messagesRead);
+                                  if (maxMessageIds.isEmpty()) {
+                                      future.complete(reader);
+                                  } else {
+                                      readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
+                                  }
                                }).exceptionally(ex -> {
                                    if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
                                        log.error("Reader {} was closed while reading existing messages.",
@@ -269,7 +290,6 @@ public class TableViewImpl<T> implements TableView<T> {
                                messagesRead,
                                durationMillis / 1000.0);
                        future.complete(reader);
-                       readTailMessages(reader);
                    }
                 });
     }