You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "codelipenghui (via GitHub)" <gi...@apache.org> on 2023/10/26 03:26:15 UTC

Re: [PR] [fix][client] Avert extensive time consumption during table view construction [pulsar]

codelipenghui commented on code in PR #21270:
URL: https://github.com/apache/pulsar/pull/21270#discussion_r1372529642


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new HashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
+            });
+            readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);

Review Comment:
   It's better to make `maxMessageIds` that passed to the `readAllExistingMessages` immutable to prohibit other threads from getting a chance to update the HashMap.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new HashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
+            });
+            readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
+        });

Review Comment:
   The exception from `reader.getLastMessageIdsAsync()` should be handled. Otherwise, the `future` will never get a chance to be complete.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java:
##########
@@ -235,20 +237,36 @@ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
         AtomicLong messagesRead = new AtomicLong();
 
         CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        readAllExistingMessages(reader, future, startTime, messagesRead);
+        reader.getLastMessageIdsAsync().thenAccept(lastMessageIds -> {
+            Map<String, TopicMessageId> maxMessageIds = new HashMap<>();
+            lastMessageIds.forEach(topicMessageId -> {
+                maxMessageIds.put(topicMessageId.getOwnerTopic(), topicMessageId);
+            });
+            readAllExistingMessages(reader, future, startTime, messagesRead, maxMessageIds);
+        });
         return future;
     }
 
     private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
-                                         AtomicLong messagesRead) {
+                                         AtomicLong messagesRead, Map<String, TopicMessageId> maxMessageIds) {
         reader.hasMessageAvailableAsync()
                 .thenAccept(hasMessage -> {
                    if (hasMessage) {
                        reader.readNextAsync()
                                .thenAccept(msg -> {
                                   messagesRead.incrementAndGet();
                                   handleMessage(msg);
-                                  readAllExistingMessages(reader, future, startTime, messagesRead);
+                                  // The message is read one by one in a single thread,
+                                  // so it's fine that uses a hashmap to store last message ID.
+                                  TopicMessageId maxMessageId = maxMessageIds.get(msg.getTopicName());
+                                  if (maxMessageId != null && msg.getMessageId().compareTo(maxMessageId) >= 0) {
+                                      maxMessageIds.remove(msg.getTopicName());
+                                  }
+                                  if (maxMessageIds.size() == 0) {

Review Comment:
   nit
   
   ```suggestion
                                     if (maxMessageIds.isEmpty()) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org