You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 13:04:12 UTC
[pulsar] 08/26: TableView should cache created readers (#15178)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bb52721b88cbf3f8772481076c6f9047f65ca9b8
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Tue Apr 19 22:18:58 2022 -0700
TableView should cache created readers (#15178)
(cherry picked from commit b1225fecd8a04106667fe09e98960829e39376af)
---
.../java/org/apache/pulsar/client/impl/TableViewImpl.java | 14 ++++++++++++++
1 file changed, 14 insertions(+)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index dc32bd008a5..483b2c1ee63 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -224,9 +224,23 @@ public class TableViewImpl<T> implements TableView<T> {
.readCompacted(true)
.poolMessages(true)
.createAsync()
+ .thenCompose(this::cacheNewReader)
.thenCompose(this::readAllExistingMessages);
}
+ private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
+ CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+ if (this.readers.containsKey(reader.getTopic())) {
+ future.completeExceptionally(
+ new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
+ } else {
+ this.readers.put(reader.getTopic(), reader);
+ future.complete(reader);
+ }
+
+ return future;
+ }
+
private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();