You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/17 09:52:11 UTC

[pulsar] branch master updated: [refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08a7a3eb78d [refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)
08a7a3eb78d is described below

commit 08a7a3eb78d06469b3cef7a8e7c1f48ff244ea03
Author: Kai Wang <kw...@streamnative.io>
AuthorDate: Tue May 17 17:52:02 2022 +0800

    [refactor][client] Reusing multi-topics reader to implement TableView auto-update partition (#15589)
    
    ### Motivation
    
    Currently, the TableViewImpl itself maintains the reader per partition to support auto-update partition, but the reader support muti-partition topic and auto-update partitions, we should reuse the `MultiTopicsReaderImpl` to implement the TableView.
    
    ### Modifications
    
    Reusing `MultiTopicsReaderImpl` to implement TableView auto-update partition.
---
 .../apache/pulsar/client/impl/TableViewTest.java   | 57 --------------
 .../apache/pulsar/client/impl/TableViewImpl.java   | 92 +++-------------------
 2 files changed, 12 insertions(+), 137 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
index 15c41041b47..dc039f225b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TableViewTest.java
@@ -30,16 +30,12 @@ import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
-import org.apache.pulsar.common.util.FutureUtil;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -182,57 +178,4 @@ public class TableViewTest extends MockedPulsarServiceBaseTest {
         });
         Assert.assertEquals(tv.keySet(), keys2);
     }
-
-
-    @Test(timeOut = 30 * 1000)
-    // Regression test for making sure partition changes are always periodically checked even after a check returned
-    // exceptionally.
-    public void testTableViewUpdatePartitionsTriggeredDespiteExceptions() throws Exception {
-        String topic = "persistent://public/default/tableview-test-update-partitions-triggered-despite-exceptions";
-        admin.topics().createPartitionedTopic(topic, 3);
-        int count = 20;
-        Set<String> keys = this.publishMessages(topic, count, false);
-        PulsarClient spyPulsarClient = Mockito.spy(pulsarClient);
-        @Cleanup
-        TableView<byte[]> tv = spyPulsarClient.newTableViewBuilder(Schema.BYTES)
-                .topic(topic)
-                .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
-                .create();
-        log.info("start tv size: {}", tv.size());
-        tv.forEachAndListen((k, v) -> log.info("{} -> {}", k, new String(v)));
-        Awaitility.await().untilAsserted(() -> {
-            log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count);
-        });
-        Assert.assertEquals(tv.keySet(), keys);
-        tv.forEachAndListen((k, v) -> log.info("checkpoint {} -> {}", k, new String(v)));
-
-        // Let update partition check throw an exception
-        Mockito.doReturn(FutureUtil.failedFuture(new PulsarClientException("")))
-                .when(spyPulsarClient)
-                .getPartitionsForTopic(Mockito.any());
-
-        admin.topics().updatePartitionedTopic(topic, 4);
-        TopicName topicName = TopicName.get(topic);
-
-        // Make sure the get partitions callback is called; it should throw an exception
-        Mockito.verify(spyPulsarClient).getPartitionsForTopic(Mockito.any());
-
-        // Send more data to partition 3, which is not in the current TableView, need update partitions
-        Set<String> keys2 =
-                this.publishMessages(topicName.getPartition(3).toString(), count * 2, false);
-
-        // Wait for 10 seconds; verify that the messages haven't arrived, which would have happened if the partitions
-        // has been updated
-        TimeUnit.SECONDS.sleep(10);
-        Assert.assertEquals(tv.size(), count);
-
-        // Let update partition check succeed, and check the messages eventually arrives
-        Mockito.doCallRealMethod().when(spyPulsarClient).getPartitionsForTopic(Mockito.any());
-        Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
-            log.info("Current tv size: {}", tv.size());
-            Assert.assertEquals(tv.size(), count * 2);
-        });
-        Assert.assertEquals(tv.keySet(), keys2);
-    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
index 6638d416886..02019ae0cfa 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -19,11 +19,9 @@
 
 package org.apache.pulsar.client.impl;
 
-import io.netty.util.Timeout;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -42,76 +39,39 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TableView;
-import org.apache.pulsar.common.util.FutureUtil;
 
 @Slf4j
 public class TableViewImpl<T> implements TableView<T> {
 
-    private final PulsarClientImpl client;
-    private final Schema<T> schema;
     private final TableViewConfigurationData conf;
 
     private final ConcurrentMap<String, T> data;
     private final Map<String, T> immutableData;
 
-    private final ConcurrentMap<String, Reader<T>> readers;
+    private final CompletableFuture<Reader<T>> reader;
 
     private final List<BiConsumer<String, T>> listeners;
     private final ReentrantLock listenersMutex;
 
     TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
-        this.client = client;
-        this.schema = schema;
         this.conf = conf;
         this.data = new ConcurrentHashMap<>();
         this.immutableData = Collections.unmodifiableMap(data);
-        this.readers = new ConcurrentHashMap<>();
         this.listeners = new ArrayList<>();
         this.listenersMutex = new ReentrantLock();
+        this.reader = client.newReader(schema)
+                .topic(conf.getTopicName())
+                .startMessageId(MessageId.earliest)
+                .autoUpdatePartitions(true)
+                .autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
+                .readCompacted(true)
+                .poolMessages(true)
+                .createAsync();
     }
 
     CompletableFuture<TableView<T>> start() {
-        return client.getPartitionsForTopic(conf.getTopicName())
-                .thenCompose(partitions -> {
-                    Set<String> partitionsSet = new HashSet<>(partitions);
-                    List<CompletableFuture<?>> futures = new ArrayList<>();
-
-                    // Add new Partitions
-                    partitions.forEach(partition -> {
-                        if (!readers.containsKey(partition)) {
-                            futures.add(newReader(partition));
-                        }
-                    });
-
-                    // Remove partitions that are not used anymore
-                    readers.forEach((existingPartition, existingReader) -> {
-                        if (!partitionsSet.contains(existingPartition)) {
-                            futures.add(existingReader.closeAsync()
-                                    .thenRun(() -> readers.remove(existingPartition, existingReader)));
-                        }
-                    });
-
-                    return FutureUtil.waitForAll(futures)
-                            .thenRun(() -> schedulePartitionsCheck());
-                }).thenApply(__ -> this);
-    }
-
-    private void schedulePartitionsCheck() {
-        client.timer()
-                .newTimeout(this::checkForPartitionsChanges, conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
-    }
-
-    private void checkForPartitionsChanges(Timeout timeout) {
-        if (timeout.isCancelled()) {
-            return;
-        }
-
-        start().whenComplete((tw, ex) -> {
-           if (ex != null) {
-               log.warn("Failed to check for changes in number of partitions:", ex);
-               schedulePartitionsCheck();
-           }
-        });
+        return reader.thenCompose(this::readAllExistingMessages)
+                .thenApply(__ -> this);
     }
 
     @Override
@@ -171,11 +131,7 @@ public class TableViewImpl<T> implements TableView<T> {
 
     @Override
     public CompletableFuture<Void> closeAsync() {
-        return FutureUtil.waitForAll(
-                readers.values().stream()
-                        .map(Reader::closeAsync)
-                        .collect(Collectors.toList())
-        );
+        return reader.thenCompose(Reader::closeAsync);
     }
 
     @Override
@@ -217,30 +173,6 @@ public class TableViewImpl<T> implements TableView<T> {
         }
     }
 
-    private CompletableFuture<Reader<T>> newReader(String partition) {
-        return client.newReader(schema)
-                .topic(partition)
-                .startMessageId(MessageId.earliest)
-                .readCompacted(true)
-                .poolMessages(true)
-                .createAsync()
-                .thenCompose(this::cacheNewReader)
-                .thenCompose(this::readAllExistingMessages);
-    }
-
-    private CompletableFuture<Reader<T>> cacheNewReader(Reader<T> reader) {
-        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
-        if (this.readers.containsKey(reader.getTopic())) {
-            future.completeExceptionally(
-                    new IllegalArgumentException("reader on partition " + reader.getTopic() + " already existed"));
-        } else {
-            this.readers.put(reader.getTopic(), reader);
-            future.complete(reader);
-        }
-
-        return future;
-    }
-
     private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
         long startTime = System.nanoTime();
         AtomicLong messagesRead = new AtomicLong();