You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:12 UTC

[pulsar] 08/26: TableView should cache created readers (#15178)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bb52721b88cbf3f8772481076c6f9047f65ca9b8
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Tue Apr 19 22:18:58 2022 -0700

    TableView should cache created readers (#15178)
    
    (cherry picked from commit b1225fecd8a04106667fe09e98960829e39376af)
---
 .../java/org/apache/pulsar/client/impl/TableViewImpl.java  | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index dc32bd008a5..483b2c1ee63 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -224,9 +224,23 @@ public class TableViewImpl<T> implements TableView<T> {
                 .readCompacted(true)
                 .poolMessages(true)
                 .createAsync()
+                .thenCompose(this::cacheNewReader)
                 .thenCompose(this::readAllExistingMessages);
     }
 
+    private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
+        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+        if (this.readers.containsKey(reader.getTopic())) {
+            future.completeExceptionally(
+                    new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
+        } else {
+            this.readers.put(reader.getTopic(), reader);
+            future.complete(reader);
+        }
+
+        return future;
+    }
+
     private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();