You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/17 09:52:11 UTC
[pulsar] branch master updated: [refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 08a7a3eb78d [refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)
08a7a3eb78d is described below
commit 08a7a3eb78d06469b3cef7a8e7c1f48ff244ea03
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Tue May 17 17:52:02 2022 +0800
[refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)
### Motivation
Currently, the TableViewImpl itself maintains the reader per partition to support auto-update partition, but the reader support muti-partition topic and auto-update partitions, we should reuse the `MultiTopicsReaderImpl` to implement the TableView.
### Modifications
Reusing `MultiTopicsReaderImpl` to implement TableView auto-update partition.
---
.../apache/pulsar/client/impl/TableViewTest.java | 57 --------------
.../apache/pulsar/client/impl/TableViewImpl.java | 92 +++-------------------
2 files changed, 12 insertions(+), 137 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 15c41041b47..dc039f225b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -30,16 +30,12 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
-import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -182,57 +178,4 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
});
Assert.assertEquals(tv.keySet(), keys2);
}
-
-
- @Test(timeOut = 30 * 1000)
- // Regression test for making sure partition changes are always periodically checked even after a check returned
- // exceptionally.
- public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
- String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
- admin.topics().createPartitionedTopic(topic, 3);
- int count = 20;
- Set<String> keys = this.publishMessages(topic, count, false);
- PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
- @Cleanup
- TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
- .topic(topic)
- .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
- .create();
- log.info("start tv size: {}", tv.size());
- tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
- Awaitility.await().untilAsserted(() -> {
- log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count);
- });
- Assert.assertEquals(tv.keySet(), keys);
- tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
-
- // Let update partition check throw an exception
- Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
- .when(spyPulsarClient)
- .getPartitionsForTopic(Mockito.any());
-
- admin.topics().updatePartitionedTopic(topic, 4);
- TopicName topicName = TopicName.get(topic);
-
- // Make sure the get partitions callback is called; it should throw an exception
- Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());
-
- // Send more data to partition 3, which is not in the current TableView, need update partitions
- Set<String> keys2 =
- this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
-
- // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
- // has been updated
- TimeUnit.SECONDS.sleep(10);
- Assert.assertEquals(tv.size(), count);
-
- // Let update partition check succeed, and check the messages eventually arrives
- Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
- Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
- log.info("Current tv size: {}", tv.size());
- Assert.assertEquals(tv.size(), count * 2);
- });
- Assert.assertEquals(tv.keySet(), keys2);
- }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 6638d416886..02019ae0cfa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -19,11 +19,9 @@
package org.apache.pulsar.client.impl;
-import io.netty.util.Timeout;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -34,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -42,76 +39,39 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
-import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class TableViewImpl<T> implements TableView<T> {
- private final PulsarClientImpl client;
- private final Schema<T> schema;
private final TableViewConfigurationData conf;
private final ConcurrentMap<String, T> data;
private final Map<String, T> immutableData;
- private final ConcurrentMap<String, Reader<T>> readers;
+ private final CompletableFuture<Reader<T>> reader;
private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;
TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
- this.client = client;
- this.schema = schema;
this.conf = conf;
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
- this.readers = new ConcurrentHashMap<>();
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
+ this.reader = client.newReader(schema)
+ .topic(conf.getTopicName())
+ .startMessageId(MessageId.earliest)
+ .autoUpdatePartitions(true)
+ .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
+ .readCompacted(true)
+ .poolMessages(true)
+ .createAsync();
}
CompletableFuture<TableView<T>> start() {
- return client.getPartitionsForTopic(conf.getTopicName())
- .thenCompose(partitions -> {
- Set<String> partitionsSet = new HashSet<>(partitions);
- List<CompletableFuture<?>> futures = new ArrayList<>();
-
- // Add new Partitions
- partitions.forEach(partition -> {
- if (!readers.containsKey(partition)) {
- futures.add(newReader(partition));
- }
- });
-
- // Remove partitions that are not used anymore
- readers.forEach((existingPartition, existingReader) -> {
- if (!partitionsSet.contains(existingPartition)) {
- futures.add(existingReader.closeAsync()
- .thenRun(() -> readers.remove(existingPartition, existingReader)));
- }
- });
-
- return FutureUtil.waitForAll(futures)
- .thenRun(() -> schedulePartitionsCheck());
- }).thenApply(__ -> this);
- }
-
- private void schedulePartitionsCheck() {
- client.timer()
- .newTimeout(this::checkForPartitionsChanges, conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
- }
-
- private void checkForPartitionsChanges(Timeout timeout) {
- if (timeout.isCancelled()) {
- return;
- }
-
- start().whenComplete((tw, ex) -> {
- if (ex != null) {
- log.warn("Failed to check for changes in number of partitions:", ex);
- schedulePartitionsCheck();
- }
- });
+ return reader.thenCompose(this::readAllExistingMessages)
+ .thenApply(__ -> this);
}
@Override
@@ -171,11 +131,7 @@ public class TableViewImpl<T> implements TableView<T> {
@Override
public CompletableFuture<Void> closeAsync() {
- return FutureUtil.waitForAll(
- readers.values().stream()
- .map(Reader::closeAsync)
- .collect(Collectors.toList())
- );
+ return reader.thenCompose(Reader::closeAsync);
}
@Override
@@ -217,30 +173,6 @@ public class TableViewImpl<T> implements TableView<T> {
}
}
- private CompletableFuture<Reader<T>> newReader(String partition) {
- return client.newReader(schema)
- .topic(partition)
- .startMessageId(MessageId.earliest)
- .readCompacted(true)
- .poolMessages(true)
- .createAsync()
- .thenCompose(this::cacheNewReader)
- .thenCompose(this::readAllExistingMessages);
- }
-
- private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
- CompletableFuture<Reader<T>> future = new CompletableFuture<>();
- if (this.readers.containsKey(reader.getTopic())) {
- future.completeExceptionally(
- new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
- } else {
- this.readers.put(reader.getTopic(), reader);
- future.complete(reader);
- }
-
- return future;
- }
-
private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();